You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/01/13 23:57:42 UTC

svn commit: r1231371 [2/2] - in /incubator/flume/branches/flume-728: flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/ flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/ flume-ng-channels/f...

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,7 @@ package org.apache.flume.conf.file;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -51,16 +52,27 @@ public class JsonFileConfigurationProvid
 
       if (sourceDef.containsKey("type")) {
         Source source =
-            getSourceFactory().create((String) sourceDef.get("type"));
-        Channel channel = conf.getChannels().get(sourceDef.get("channel"));
+            getSourceFactory().create(
+                (String) sourceDef.get("name"),
+                (String) sourceDef.get("type"));
+
+        //String channelList = sourceDef.
+
+        List<String> channelNames = (List<String>) sourceDef.get("channels");
+
+        List<Channel> channels = new ArrayList<Channel>();
+
+        for (String chName : channelNames) {
+          channels.add(conf.getChannels().get(chName));
+        }
 
         Context context = new Context();
         context.setParameters(sourceDef);
 
         Configurables.configure(source, context);
 
-        if (channel != null) {
-          source.setChannel(channel);
+        if (channels.size() > 0) {
+          source.setChannels(channels);
         } else {
           logger.warn(
               "No channel named {} - source:{} is likely non-functional.",
@@ -86,7 +98,9 @@ public class JsonFileConfigurationProvid
 
       if (sinkDef.containsKey("type")) {
         Sink sink =
-            getSinkFactory().create((String) sinkDef.get("type"));
+            getSinkFactory().create(
+                (String) sinkDef.get("name"),
+                (String) sinkDef.get("type"));
         Channel channel = conf.getChannels().get(sinkDef.get("channel"));
 
         Context context = new Context();
@@ -117,8 +131,9 @@ public class JsonFileConfigurationProvid
       logger.debug("channel:{}", channelDef);
 
       if (channelDef.containsKey("type")) {
-        Channel channel = getChannelFactory()
-            .create((String) channelDef.get("type"));
+        Channel channel = getChannelFactory().create(
+                (String) channelDef.get("name"),
+                (String) channelDef.get("type"));
 
         Context context = new Context();
         context.setParameters(channelDef);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java Fri Jan 13 22:57:41 2012
@@ -21,9 +21,11 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.StringTokenizer;
 
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
@@ -31,7 +33,6 @@ import org.apache.flume.Sink;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.Source;
 import org.apache.flume.SourceRunner;
-import org.apache.flume.channel.FanoutChannel;
 import org.apache.flume.conf.Configurables;
 import org.apache.flume.conf.file.AbstractFileConfigurationProvider;
 import org.apache.flume.conf.file.SimpleNodeConfiguration;
@@ -74,10 +75,10 @@ import org.slf4j.LoggerFactory;
  * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.type = avro</tt>.
  * This namespace can also be used to configure other configuration of the
  * source runner as needed. For example:
- * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt></li>
- * <li>For each channel named in the <tt>&lt;agent name&gt;.channels</tt>, there
- * must be a non-empty <tt>type</tt> attribute specified from the valid set of
- * channel types. For example:
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt>
+ * </li><li>For each channel named in the <tt>&lt;agent name&gt;.channels</tt>,
+ * there must be a non-empty <tt>type</tt> attribute specified from the valid
+ * set of channel types. For example:
  * <tt>&lt;agent name&gt;.channels.&lt;channel name&gt;.type = mem</tt></li>
  * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
  * be a non-empty <tt>type</tt> attribute specified from the valid set of sink
@@ -98,7 +99,7 @@ import org.slf4j.LoggerFactory;
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval =
  * 60</tt></li>
  * </ul>
- * 
+ *
  * Apart from the above required configuration values, each source, sink or
  * channel can have its own set of arbitrary configuration as required by the
  * implementation. Each of these configuration values are expressed by fully
@@ -120,36 +121,36 @@ import org.slf4j.LoggerFactory;
  * </p>
  * <p>
  * Example configuration file:
- * 
+ *
  * <pre>
  * #
  * # Flume Configuration
  * # This file contains configuration for one Agent identified as host1.
  * #
- * 
+ *
  * host1.sources = avroSource thriftSource
  * host1.channels = jdbcChannel
  * host1.sinks = hdfsSink
- * 
+ *
  * # avroSource configuration
  * host1.sources.avroSource.type = org.apache.flume.source.AvroSource
  * host1.sources.avroSource.runner.type = avro
  * host1.sources.avroSource.runner.port = 11001
  * host1.sources.avroSource.channels = jdbcChannel
- * 
+ *
  * # thriftSource configuration
  * host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
  * host1.sources.thriftSource.runner.type = thrift
  * host1.sources.thriftSource.runner.port = 12001
  * host1.sources.thriftSource.channels = jdbcChannel
- * 
+ *
  * # jdbcChannel configuration
  * host1.channels.jdbcChannel.type = jdbc
  * host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
  * host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
  * host1.channels.jdbcChannel.jdbc.username = flume
  * host1.channels.jdbcChannel.jdbc.password = flume
- * 
+ *
  * # hdfsSink configuration
  * host1.sinks.hdfsSink.type = hdfs
  * host1.sinks.hdfsSink.namenode = hdfs://localhost/
@@ -157,9 +158,9 @@ import org.slf4j.LoggerFactory;
  * host1.sinks.hdfsSink.runner.type = polling
  * host1.sinks.hdfsSink.runner.polling.interval = 60
  * </pre>
- * 
+ *
  * </p>
- * 
+ *
  * @see java.util.Properties#load(java.io.Reader)
  */
 public class PropertiesFileConfigurationProvider extends
@@ -209,13 +210,13 @@ public class PropertiesFileConfiguration
     }
   }
 
-  private void loadChannels(AgentConfiguration agentConf, NodeConfiguration conf)
-      throws InstantiationException {
+  private void loadChannels(AgentConfiguration agentConf,
+      NodeConfiguration conf) throws InstantiationException {
 
     for (ComponentConfiguration comp : agentConf.getChannels()) {
       Context context = new Context();
 
-      Channel channel = getChannelFactory().create(
+      Channel channel = getChannelFactory().create(comp.getComponentName(),
           comp.getConfiguration().get("type"));
 
       for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
@@ -234,8 +235,10 @@ public class PropertiesFileConfiguration
     for (ComponentConfiguration comp : agentConf.getSources()) {
       Context context = new Context();
 
-      Source source = getSourceFactory().create(
-          comp.getConfiguration().get("type"));
+      Map<String, String> componentConfig = comp.getConfiguration();
+
+      Source source = getSourceFactory().create(comp.getComponentName(),
+          componentConfig.get("type"));
 
       for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
         context.put(entry.getKey(), entry.getValue());
@@ -243,14 +246,14 @@ public class PropertiesFileConfiguration
 
       Configurables.configure(source, context);
 
-      if (comp.getConfiguration().get("channels").contains(",")) {
-        // then create a fanout if there are channel for this source
-        source.setChannel(getChannelFactory().createFanout(
-            comp.getConfiguration().get("channels"),conf.getChannels()));
-      } else {
-        source.setChannel(conf.getChannels().get(
-            comp.getConfiguration().get("channels")));        
+      String channelNames = comp.getConfiguration().get("channels");
+      List<Channel> channels = new ArrayList<Channel>();
+
+      for (String chName : channelNames.split(" ")) {
+        channels.add(conf.getChannels().get(chName));
       }
+
+      source.setChannels(channels);
       conf.getSourceRunners().put(comp.getComponentName(),
           SourceRunner.forSource(source));
     }
@@ -261,13 +264,11 @@ public class PropertiesFileConfiguration
 
     for (ComponentConfiguration comp : agentConf.getSinks()) {
       Context context = new Context();
+      Map<String, String> componentConfig = comp.getConfiguration();
 
-      String type = comp.getConfiguration().get("type");
-      Sink sink = getSinkFactory().create(type);
-      if(sink == null) {
-        throw new InstantiationException("Can't instantiate sink with type " + type + " (it's probably " +
-          "unknown type)");
-      }
+
+      Sink sink = getSinkFactory().create(comp.getComponentName(),
+          componentConfig.get("type"));
 
       for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
         context.put(entry.getKey(), entry.getValue());
@@ -276,7 +277,7 @@ public class PropertiesFileConfiguration
       Configurables.configure(sink, context);
 
       sink.setChannel(conf.getChannels().get(
-          comp.getConfiguration().get("channel")));
+          componentConfig.get("channel")));
       conf.getSinkRunners().put(comp.getComponentName(),
           SinkRunner.forSink(sink));
     }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Fri Jan 13 22:57:41 2012
@@ -78,8 +78,6 @@ public class Application {
     application.setArgs(args);
 
     try {
-      application.loadPlugins();
-
       if (application.parseOptions()) {
         application.run();
       }
@@ -97,23 +95,6 @@ public class Application {
     channelFactory = new DefaultChannelFactory();
   }
 
-  public void loadPlugins() {
-    channelFactory.register("memory", MemoryChannel.class);
-    channelFactory.register("jdbc", JdbcChannel.class);
-    channelFactory.register("fanout", FanoutChannel.class);
-
-    sourceFactory.register("seq", SequenceGeneratorSource.class);
-    sourceFactory.register("netcat", NetcatSource.class);
-    sourceFactory.register("exec", ExecSource.class);
-    sourceFactory.register("avro", AvroSource.class);
-
-    sinkFactory.register("null", NullSink.class);
-    sinkFactory.register("logger", LoggerSink.class);
-    sinkFactory.register("file-roll", RollingFileSink.class);
-    sinkFactory.register("hdfs", HDFSEventSink.class);
-    sinkFactory.register("avro", AvroSink.class);
-  }
-
   public boolean parseOptions() throws ParseException {
     Options options = new Options();
 
@@ -165,7 +146,8 @@ public class Application {
 
     final FlumeNode node = new FlumeNode();
     DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
-    AbstractFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider();
+    AbstractFileConfigurationProvider configurationProvider =
+        new PropertiesFileConfigurationProvider();
 
     configurationProvider.setChannelFactory(channelFactory);
     configurationProvider.setSourceFactory(sourceFactory);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java Fri Jan 13 22:57:41 2012
@@ -40,6 +40,7 @@ import org.apache.flume.sink.NullSink;
 import org.apache.flume.source.DefaultSourceFactory;
 import org.apache.flume.source.NetcatSource;
 import org.apache.flume.source.SequenceGeneratorSource;
+import org.apache.flume.source.SourceType;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -62,14 +63,6 @@ public class TestJsonFileConfigurationPr
     SourceFactory sourceFactory = new DefaultSourceFactory();
     SinkFactory sinkFactory = new DefaultSinkFactory();
 
-    channelFactory.register("memory", MemoryChannel.class);
-
-    sourceFactory.register("seq", SequenceGeneratorSource.class);
-    sourceFactory.register("netcat", NetcatSource.class);
-
-    sinkFactory.register("null", NullSink.class);
-    sinkFactory.register("logger", LoggerSink.class);
-
     provider = new JsonFileConfigurationProvider();
 
     provider.setNodeName("localhost");

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,9 @@
 
 package org.apache.flume.node;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Sink;
@@ -144,7 +147,9 @@ public class TestAbstractLogicalNodeMana
     Configurables.configure(channel, new Context());
 
     Source generatorSource = new SequenceGeneratorSource();
-    generatorSource.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+    generatorSource.setChannels(channels);
 
     Sink nullSink = new NullSink();
     nullSink.setChannel(channel);
@@ -175,7 +180,9 @@ public class TestAbstractLogicalNodeMana
     Configurables.configure(channel, new Context());
 
     Source source = new SequenceGeneratorSource();
-    source.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+    source.setChannels(channels);
 
     Sink sink = new NullSink();
     sink.setChannel(channel);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java Fri Jan 13 22:57:41 2012
@@ -19,9 +19,12 @@
 
 package org.apache.flume.node;
 
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 
+import org.apache.flume.Channel;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleController;
@@ -67,7 +70,9 @@ public class TestDefaultLogicalNodeManag
 
     for (int i = 0; i < 3; i++) {
       SequenceGeneratorSource source = new SequenceGeneratorSource();
-      source.setChannel(new MemoryChannel());
+      List<Channel> channels = new ArrayList<Channel>();
+      channels.add(new MemoryChannel());
+      source.setChannels(channels);
 
       PollableSourceRunner sourceRunner = new PollableSourceRunner();
       sourceRunner.setSource(source);
@@ -91,7 +96,9 @@ public class TestDefaultLogicalNodeManag
 
     for (int i = 0; i < 30; i++) {
       SequenceGeneratorSource source = new SequenceGeneratorSource();
-      source.setChannel(new MemoryChannel());
+      List<Channel> channels = new ArrayList<Channel>();
+      channels.add(new MemoryChannel());
+      source.setChannels(channels);
 
       PollableSourceRunner sourceRunner = new PollableSourceRunner();
       sourceRunner.setSource(source);
@@ -123,7 +130,9 @@ public class TestDefaultLogicalNodeManag
 
     for (int i = 0; i < 30; i++) {
       SequenceGeneratorSource source = new SequenceGeneratorSource();
-      source.setChannel(new MemoryChannel());
+      List<Channel> channels = new ArrayList<Channel>();
+      channels.add(new MemoryChannel());
+      source.setChannels(channels);
 
       PollableSourceRunner sourceRunner = new PollableSourceRunner();
       sourceRunner.setSource(source);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Jan 13 22:57:41 2012
@@ -24,6 +24,8 @@ import java.io.Writer;
 import java.net.InetSocketAddress;
 import java.nio.channels.Channels;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
@@ -55,8 +57,9 @@ public class TestNetcatSource {
     context.put("capacity", "50");
 
     Configurables.configure(channel, context);
-
-    source.setChannel(channel);
+    List<Channel> channels = new ArrayList<Channel>();
+    channels.add(channel);
+    source.setChannels(channels);
   }
 
   @Test
@@ -98,7 +101,7 @@ public class TestNetcatSource {
 
     };
 
-    Transaction tx = source.getChannel().getTransaction();
+    Transaction tx = source.getChannels().get(0).getTransaction();
     tx.begin();
 
     for (int i = 0; i < 100; i++) {

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json Fri Jan 13 22:57:41 2012
@@ -5,12 +5,12 @@
       {
         "name": "source1",
         "type": "seq",
-        "channel": "ch1"
+        "channels": [ "ch1" ]
       },
       {
         "name": "source2",
         "type": "seq",
-        "channel": "ch2"
+        "channels": [ "ch2" ]
       }
     ],
 
@@ -18,12 +18,12 @@
       {
         "name": "sink1",
         "type": "null",
-        "channel": "ch1"
+        "channels": [ "ch1" ]
       },
       {
         "name": "sink2",
         "type": "null",
-        "channel": "ch2"
+        "channels": [ "ch2" ]
       }
     ],
 
@@ -50,19 +50,21 @@
         "type": "netcat",
         "bind": "0.0.0.0",
         "port": "41414",
-        "channel": "ch1"
+        "channels": [ "ch1" ]
       }
     ],
 
     "sinks": [
       {
+        "name": "sinkx",
         "type": "null",
-        "channel": "ch1"
+        "channels": [ "ch1" ]
       }
     ],
 
     "channels": [
       {
+        "name": "mem-channel",
         "type": "memory"
       }
     ]