You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/01/25 01:27:06 UTC

git commit: FLUME-1852. Issues with EmbeddedAgentConfiguration.

Updated Branches:
  refs/heads/trunk d45af178e -> 3df65e12c


FLUME-1852. Issues with EmbeddedAgentConfiguration.

(Brock Noland via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3df65e12
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3df65e12
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3df65e12

Branch: refs/heads/trunk
Commit: 3df65e12c8d480cd46f190a0bb4addfee4272062
Parents: d45af17
Author: Mike Percy <mp...@apache.org>
Authored: Thu Jan 24 16:26:03 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jan 24 16:26:33 2013 -0800

----------------------------------------------------------------------
 .../agent/embedded/EmbeddedAgentConfiguration.java |   73 ++++++++-------
 .../apache/flume/agent/embedded/package-info.java  |    2 +-
 .../embedded/TestEmbeddedAgentConfiguration.java   |   42 ++++++++-
 .../embedded/TestEmbeddedAgentEmbeddedSource.java  |    1 -
 4 files changed, 79 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
index e52f912..6204bc5 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
@@ -33,6 +33,7 @@ import org.apache.flume.conf.sink.SinkProcessorType;
 import org.apache.flume.conf.sink.SinkType;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 
 /**
@@ -65,7 +66,7 @@ public class EmbeddedAgentConfiguration {
 
   public static final String SINKS_PREFIX = join(SINKS, "");
   /**
-   * Source type, choices are `embedded' or `avro'
+   * Source type, choices are `embedded'
    */
   public static final String SOURCE_TYPE = join(SOURCE, TYPE);
   /**
@@ -81,7 +82,7 @@ public class EmbeddedAgentConfiguration {
    */
   public static final String CHANNEL_PREFIX = join(CHANNEL, "");
   /**
-   * Sink processor type, choices are `default' (failover) or `load_balance'
+   * Sink processor type, choices are `default', `failover' or `load_balance'
    */
   public static final String SINK_PROCESSOR_TYPE = join(SINK_PROCESSOR, TYPE);
   /**
@@ -90,10 +91,11 @@ public class EmbeddedAgentConfiguration {
   public static final String SINK_PROCESSOR_PREFIX = join(SINK_PROCESSOR, "");
   /**
    * Embedded source which provides simple in-memory transfer to channel.
-   * Use this source via the put,pulAll methods on the EmbeddedAgent. This
-   * is the recommended source to use for Embedded Agents.
+   * Use this source via the put,putAll methods on the EmbeddedAgent. This
+   * is the only supported source to use for Embedded Agents.
    */
   public static final String SOURCE_TYPE_EMBEDDED = EmbeddedSource.class.getName();
+  private static final String SOURCE_TYPE_EMBEDDED_ALIAS = "EMBEDDED";
   /**
    * Memory channel which stores events in heap. See Flume User Guide for
    * configuration information. This is the recommended channel to use for
@@ -101,8 +103,8 @@ public class EmbeddedAgentConfiguration {
    */
   public static final String CHANNEL_TYPE_MEMORY = ChannelType.MEMORY.name();
   /**
-   * File based channel which stores events in heap. See Flume User Guide for
-   * configuration information.
+   * File based channel which stores events in on local disk. See Flume User
+   * Guide for configuration information.
    */
   public static final String CHANNEL_TYPE_FILE = ChannelType.FILE.name();
 
@@ -129,6 +131,7 @@ public class EmbeddedAgentConfiguration {
 
 
   private static final String[] ALLOWED_SOURCES = {
+    SOURCE_TYPE_EMBEDDED_ALIAS,
     SOURCE_TYPE_EMBEDDED,
   };
 
@@ -147,6 +150,9 @@ public class EmbeddedAgentConfiguration {
     SINK_PROCESSOR_TYPE_LOAD_BALANCE
   };
 
+  private static final ImmutableList<String> DISALLOWED_SINK_NAMES =
+      ImmutableList.of("source", "channel", "processor");
+
   private static void validate(String name,
       Map<String, String> properties) throws FlumeException {
 
@@ -158,6 +164,10 @@ public class EmbeddedAgentConfiguration {
     checkRequired(properties, SINKS);
     String sinkNames = properties.get(SINKS);
     for(String sink : sinkNames.split("\\s+")) {
+      if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase())) {
+        throw new FlumeException("Sink name " + sink + " is one of the" +
+            " disallowed sink names: " + DISALLOWED_SINK_NAMES);
+      }
       String key = join(sink, TYPE);
       checkRequired(properties, key);
       checkAllowed(ALLOWED_SINKS, properties.get(key));
@@ -182,7 +192,8 @@ public class EmbeddedAgentConfiguration {
     // we are going to modify the properties as we parse the config
     properties = new HashMap<String, String>(properties);
 
-    if(!properties.containsKey(SOURCE_TYPE)) {
+    if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS.
+        equalsIgnoreCase(properties.get(SOURCE_TYPE))) {
       properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED);
     }
     String sinkNames = properties.remove(SINKS);
@@ -199,7 +210,6 @@ public class EmbeddedAgentConfiguration {
     // user supplied config -> agent configuration
     Map<String, String> result = Maps.newHashMap();
 
-    Joiner joiner = Joiner.on(SEPERATOR);
     // properties will be modified during iteration so we need a
     // copy of the keys
     Set<String> userProvidedKeys;
@@ -209,42 +219,40 @@ public class EmbeddedAgentConfiguration {
      * source at the channel.
      */
     // point agent at source
-    result.put(joiner.
-        join(name, BasicConfigurationConstants.CONFIG_SOURCES), sourceName);
+    result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES),
+        sourceName);
     // point agent at channel
-    result.put(joiner.
-        join(name, BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
-    // point agent at source
-    result.put(joiner.
-        join(name, BasicConfigurationConstants.CONFIG_SINKS), sinkNames);
+    result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS),
+        channelName);
+    // point agent at sinks
+    result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS),
+        sinkNames);
     // points the agent at the sinkgroup
-    result.put(joiner.
-        join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
+    result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
         sinkGroupName);
     // points the sinkgroup at the sinks
-    result.put(joiner.
-        join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
+    result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
             sinkGroupName, SINKS), sinkNames);
     // points the source at the channel
-    result.put(joiner.join(name,
+    result.put(join(name,
         BasicConfigurationConstants.CONFIG_SOURCES, sourceName,
         BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
     /*
-     * Second process the the sink configuration and point the sinks
+     * Second process the sink configuration and point the sinks
      * at the channel.
      */
     userProvidedKeys = new HashSet<String>(properties.keySet());
     for(String sink :  sinkNames.split("\\s+")) {
       for(String key : userProvidedKeys) {
         String value = properties.get(key);
-        if(key.startsWith(sink)) {
+        if(key.startsWith(sink + SEPERATOR)) {
           properties.remove(key);
-          result.put(joiner.join(name,
+          result.put(join(name,
               BasicConfigurationConstants.CONFIG_SINKS, key), value);
         }
       }
       // point the sink at the channel
-      result.put(joiner.join(name,
+      result.put(join(name,
           BasicConfigurationConstants.CONFIG_SINKS, sink,
           BasicConfigurationConstants.CONFIG_CHANNEL), channelName);
     }
@@ -255,20 +263,19 @@ public class EmbeddedAgentConfiguration {
     userProvidedKeys = new HashSet<String>(properties.keySet());
     for(String key : userProvidedKeys) {
       String value = properties.get(key);
-      if(key.startsWith(SOURCE)) {
+      if(key.startsWith(SOURCE_PREFIX)) {
         // users use `source' but agent needs the actual source name
-        key = key.replace(SOURCE, sourceName);
-        result.put(joiner.join(name,
+        key = key.replaceFirst(SOURCE, sourceName);
+        result.put(join(name,
             BasicConfigurationConstants.CONFIG_SOURCES, key), value);
-      } else if(key.startsWith(CHANNEL)) {
+      } else if(key.startsWith(CHANNEL_PREFIX)) {
         // users use `channel' but agent needs the actual channel name
-        key = key.replace(CHANNEL, channelName);
-        result.put(joiner.join(name,
+        key = key.replaceFirst(CHANNEL, channelName);
+        result.put(join(name,
             BasicConfigurationConstants.CONFIG_CHANNELS, key), value);
-      } else if(key.startsWith(SINK_PROCESSOR)) {
+      } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) {
         // agent.sinkgroups.sinkgroup.processor.*
-        result.put(joiner.
-            join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
+        result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
                 sinkGroupName, key), value);
       } else {
         // XXX should we simply ignore this?

http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
index 0a53c5f..919a630 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
@@ -19,6 +19,6 @@
 /**
  * This package provides Flume users the ability to embed simple agents
  * in applications. For specific and up to date information, please see
- * the Flume User Guide.
+ * the Flume Developer Guide.
  */
 package org.apache.flume.agent.embedded;

http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
index 3805ea8..f70d0b1 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
@@ -47,10 +47,30 @@ public class TestEmbeddedAgentConfiguration {
     properties.put("sink2.port", "2");
     properties.put("processor.type", "load_balance");
   }
+
+
+  @Test
+  public void testFullSourceType() throws Exception {
+    doTestExcepted(EmbeddedAgentConfiguration.
+        configure("test1", properties));
+  }
+
   @Test
-  public void testBasic() throws Exception {
-    Map<String, String> actual = EmbeddedAgentConfiguration.
-        configure("test1", properties);
+  public void testMissingSourceType() throws Exception {
+    Assert.assertNotNull(properties.remove("source.type"));
+    doTestExcepted(EmbeddedAgentConfiguration.
+        configure("test1", properties));
+  }
+
+  @Test
+  public void testShortSourceType() throws Exception {
+    properties.put("source.type", "EMBEDDED");
+    doTestExcepted(EmbeddedAgentConfiguration.
+        configure("test1", properties));
+  }
+
+
+  public void doTestExcepted(Map<String, String> actual) throws Exception {
     Map<String, String> expected = Maps.newHashMap();
     expected.put("test1.channels", "channel-test1");
     expected.put("test1.channels.channel-test1.capacity", "200");
@@ -71,7 +91,6 @@ public class TestEmbeddedAgentConfiguration {
     expected.put("test1.sources.source-test1.channels", "channel-test1");
     expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration.
         SOURCE_TYPE_EMBEDDED);
-
     Assert.assertEquals(expected, actual);
   }
 
@@ -116,4 +135,19 @@ public class TestEmbeddedAgentConfiguration {
     properties.put("bad.key.name", "bad");
     EmbeddedAgentConfiguration.configure("test1", properties);
   }
+  @Test(expected = FlumeException.class)
+  public void testSinkNamedLikeSource() throws Exception {
+    properties.put("sinks", "source");
+    EmbeddedAgentConfiguration.configure("test1", properties);
+  }
+  @Test(expected = FlumeException.class)
+  public void testSinkNamedLikeChannel() throws Exception {
+    properties.put("sinks", "channel");
+    EmbeddedAgentConfiguration.configure("test1", properties);
+  }
+  @Test(expected = FlumeException.class)
+  public void testSinkNamedLikeProcessor() throws Exception {
+    properties.put("sinks", "processor");
+    EmbeddedAgentConfiguration.configure("test1", properties);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
index 4e94d72..9d85e6e 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
@@ -28,7 +28,6 @@ import junit.framework.Assert;
 import org.apache.flume.Channel;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.event.SimpleEvent;