You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/26 01:16:56 UTC
svn commit: r1161797 -
/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Author: esammer
Date: Thu Aug 25 23:16:55 2011
New Revision: 1161797
URL: http://svn.apache.org/viewvc?rev=1161797&view=rev
Log:
- Added support for passing configuration data from the command line. Just a hack to
be able to try things. This isn't permanent.
Modified:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1161797&r1=1161796&r2=1161797&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Thu Aug 25 23:16:55 2011
@@ -1,6 +1,8 @@
package org.apache.flume.node;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
@@ -10,9 +12,12 @@ import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.flume.Context;
+import org.apache.flume.EventSink;
+import org.apache.flume.EventSource;
import org.apache.flume.LogicalNode;
import org.apache.flume.SinkFactory;
import org.apache.flume.SourceFactory;
+import org.apache.flume.conf.Configurables;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
@@ -34,6 +39,8 @@ public class Application {
private String[] args;
private Set<NodeConfiguration> nodeConfigs;
+ private Map<String, Context> contexts;
+
private SourceFactory sourceFactory;
private SinkFactory sinkFactory;
@@ -58,6 +65,7 @@ public class Application {
nodeConfigs = new HashSet<NodeConfiguration>();
sourceFactory = new DefaultSourceFactory();
sinkFactory = new DefaultSinkFactory();
+ contexts = new HashMap<String, Context>();
}
public void loadPlugins() {
@@ -86,9 +94,9 @@ public class Application {
for (String value : values) {
String[] parts = value.split(":");
- if (parts.length != 3) {
+ if (parts.length < 3) {
throw new ParseException(
- "Node definition must be in the format <name>:<source>:<sink>");
+ "Node definition must be in the format <name>:<source>:<sink>:<context params>");
}
DefaultNodeConfiguration nodeConfiguration = new DefaultNodeConfiguration();
@@ -97,6 +105,27 @@ public class Application {
nodeConfiguration.setSourceDefinition(parts[1]);
nodeConfiguration.setSinkDefinition(parts[2]);
+ Context context = new Context();
+
+ if (parts.length >= 4) {
+ logger.debug("Parsing context parameters:{}", parts[3]);
+
+ String[] contextParts = parts[3].split("\\|");
+
+ for (String contextPart : contextParts) {
+ logger.debug("parsing kv pair:{}", contextPart);
+
+ String[] strings = contextPart.split("=");
+ context.put(strings[0], strings[1]);
+ }
+
+ }
+
+ logger.debug("Created nodeConfig:{} context:{}", nodeConfiguration,
+ context);
+
+ contexts.put(nodeConfiguration.getName(), context);
+
nodeConfigs.add(nodeConfiguration);
}
}
@@ -133,12 +162,18 @@ public class Application {
if (node.getLifecycleState().equals(LifecycleState.START)) {
for (NodeConfiguration nodeConf : nodeConfigs) {
+ EventSource source = sourceFactory.create(nodeConf
+ .getSourceDefinition());
+ EventSink sink = sinkFactory.create(nodeConf.getSinkDefinition());
+
+ Configurables.configure(source, contexts.get(nodeConf.getName()));
+ Configurables.configure(sink, contexts.get(nodeConf.getName()));
+
LogicalNode logicalNode = new LogicalNode();
logicalNode.setName(nodeConf.getName());
- logicalNode.setSource(sourceFactory.create(nodeConf
- .getSourceDefinition()));
- logicalNode.setSink(sinkFactory.create(nodeConf.getSinkDefinition()));
+ logicalNode.setSource(source);
+ logicalNode.setSink(sink);
nodeManager.add(logicalNode);
}