You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/08/17 22:55:28 UTC
svn commit: r1158903 -
/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Author: esammer
Date: Wed Aug 17 20:55:27 2011
New Revision: 1158903
URL: http://svn.apache.org/viewvc?rev=1158903&view=rev
Log:
- Added skeletal implementation of source / sink factory bootstrapping.
- Added extremely simplistic (and almost certainly temporary) command line parsing and
support for definition of sources / sinks.
Modified:
incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1158903&r1=1158902&r2=1158903&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Wed Aug 17 20:55:27 2011
@@ -1,10 +1,28 @@
package org.apache.flume.node;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
import org.apache.flume.Context;
+import org.apache.flume.LogicalNode;
+import org.apache.flume.SinkFactory;
+import org.apache.flume.SourceFactory;
import org.apache.flume.lifecycle.LifecycleController;
import org.apache.flume.lifecycle.LifecycleException;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.flume.node.nodemanager.DefaultLogicalNodeManager;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.sink.LoggerSink;
+import org.apache.flume.sink.NullSink;
+import org.apache.flume.source.DefaultSourceFactory;
+import org.apache.flume.source.NetcatSource;
+import org.apache.flume.source.SequenceGeneratorSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,6 +32,9 @@ public class Application {
.getLogger(Application.class);
private String[] args;
+ private Set<NodeConfiguration> nodeConfigs;
+ private SourceFactory sourceFactory;
+ private SinkFactory sinkFactory;
public static void main(String[] args) {
Application application = new Application();
@@ -21,14 +42,67 @@ public class Application {
application.setArgs(args);
try {
+ application.loadPlugins();
+ application.parseOptions();
application.run();
+ } catch (ParseException e) {
+ logger.error(e.getMessage());
} catch (Exception e) {
logger.error("A fatal error occurred while running. Exception follows.",
e);
}
}
- public void run() throws LifecycleException, InterruptedException {
+ public Application() {
+ nodeConfigs = new HashSet<NodeConfiguration>();
+ sourceFactory = new DefaultSourceFactory();
+ sinkFactory = new DefaultSinkFactory();
+ }
+
+ public void loadPlugins() {
+ sourceFactory.register("seq", SequenceGeneratorSource.class);
+ sourceFactory.register("netcat", NetcatSource.class);
+
+ sinkFactory.register("null", NullSink.class);
+ sinkFactory.register("logger", LoggerSink.class);
+ }
+
+ public void parseOptions() throws ParseException {
+ Options options = new Options();
+
+ Option option = new Option("n", "node", true, "creates a logical node");
+ option.setValueSeparator(',');
+ options.addOption(option);
+
+ CommandLineParser parser = new GnuParser();
+
+ CommandLine commandLine = parser.parse(options, args);
+
+ if (commandLine.hasOption("node")) {
+ String[] values = commandLine.getOptionValues("node");
+
+ for (String value : values) {
+ String[] parts = value.split(":");
+
+ if (parts.length != 3) {
+ throw new ParseException(
+ "Node definition must be in the format <name>:<source>:<sink>");
+ }
+
+ DefaultNodeConfiguration nodeConfiguration = new DefaultNodeConfiguration();
+
+ nodeConfiguration.setName(parts[0]);
+ nodeConfiguration.setSourceDefinition(parts[1]);
+ nodeConfiguration.setSinkDefinition(parts[2]);
+
+ nodeConfigs.add(nodeConfiguration);
+ }
+ }
+ }
+
+ public void run() throws LifecycleException, InterruptedException,
+ InstantiationException {
+
final Context context = new Context();
final FlumeNode node = new FlumeNode();
NodeManager nodeManager = new DefaultLogicalNodeManager();
@@ -54,6 +128,20 @@ public class Application {
});
node.start(context);
+ LifecycleController.waitForOneOf(node, LifecycleState.START_OR_ERROR);
+
+ if (node.getLifecycleState().equals(LifecycleState.START)) {
+ for (NodeConfiguration nodeConf : nodeConfigs) {
+ LogicalNode logicalNode = new LogicalNode();
+
+ logicalNode.setName(nodeConf.getName());
+ logicalNode.setSource(sourceFactory.create(nodeConf
+ .getSourceDefinition()));
+ logicalNode.setSink(sinkFactory.create(nodeConf.getSinkDefinition()));
+
+ nodeManager.add(logicalNode);
+ }
+ }
LifecycleController.waitForOneOf(node, LifecycleState.STOP_OR_ERROR);
}