You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/26 01:16:56 UTC

svn commit: r1161797 - /incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java

Author: esammer
Date: Thu Aug 25 23:16:55 2011
New Revision: 1161797

URL: http://svn.apache.org/viewvc?rev=1161797&view=rev
Log:
- Added support for passing configuration data from the command line. Just a hack to
  be able to try things. This isn't permanent.

Modified:
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1161797&r1=1161796&r2=1161797&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Thu Aug 25 23:16:55 2011
@@ -1,6 +1,8 @@
 package org.apache.flume.node;
 
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.cli.CommandLine;
@@ -10,9 +12,12 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.flume.Context;
+import org.apache.flume.EventSink;
+import org.apache.flume.EventSource;
 import org.apache.flume.LogicalNode;
 import org.apache.flume.SinkFactory;
 import org.apache.flume.SourceFactory;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -34,6 +39,8 @@ public class Application {
 
   private String[] args;
   private Set<NodeConfiguration> nodeConfigs;
+  private Map<String, Context> contexts;
+
   private SourceFactory sourceFactory;
   private SinkFactory sinkFactory;
 
@@ -58,6 +65,7 @@ public class Application {
     nodeConfigs = new HashSet<NodeConfiguration>();
     sourceFactory = new DefaultSourceFactory();
     sinkFactory = new DefaultSinkFactory();
+    contexts = new HashMap<String, Context>();
   }
 
   public void loadPlugins() {
@@ -86,9 +94,9 @@ public class Application {
       for (String value : values) {
         String[] parts = value.split(":");
 
-        if (parts.length != 3) {
+        if (parts.length < 3) {
           throw new ParseException(
-              "Node definition must be in the format <name>:<source>:<sink>");
+              "Node definition must be in the format <name>:<source>:<sink>:<context params>");
         }
 
         DefaultNodeConfiguration nodeConfiguration = new DefaultNodeConfiguration();
@@ -97,6 +105,27 @@ public class Application {
         nodeConfiguration.setSourceDefinition(parts[1]);
         nodeConfiguration.setSinkDefinition(parts[2]);
 
+        Context context = new Context();
+
+        if (parts.length >= 4) {
+          logger.debug("Parsing context parameters:{}", parts[3]);
+
+          String[] contextParts = parts[3].split("\\|");
+
+          for (String contextPart : contextParts) {
+            logger.debug("parsing kv pair:{}", contextPart);
+
+            String[] strings = contextPart.split("=");
+            context.put(strings[0], strings[1]);
+          }
+
+        }
+
+        logger.debug("Created nodeConfig:{} context:{}", nodeConfiguration,
+            context);
+
+        contexts.put(nodeConfiguration.getName(), context);
+
         nodeConfigs.add(nodeConfiguration);
       }
     }
@@ -133,12 +162,18 @@ public class Application {
 
     if (node.getLifecycleState().equals(LifecycleState.START)) {
       for (NodeConfiguration nodeConf : nodeConfigs) {
+        EventSource source = sourceFactory.create(nodeConf
+            .getSourceDefinition());
+        EventSink sink = sinkFactory.create(nodeConf.getSinkDefinition());
+
+        Configurables.configure(source, contexts.get(nodeConf.getName()));
+        Configurables.configure(sink, contexts.get(nodeConf.getName()));
+
         LogicalNode logicalNode = new LogicalNode();
 
         logicalNode.setName(nodeConf.getName());
-        logicalNode.setSource(sourceFactory.create(nodeConf
-            .getSourceDefinition()));
-        logicalNode.setSink(sinkFactory.create(nodeConf.getSinkDefinition()));
+        logicalNode.setSource(source);
+        logicalNode.setSink(sink);
 
         nodeManager.add(logicalNode);
       }