You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/01/13 23:57:42 UTC
svn commit: r1231371 [2/2] - in /incubator/flume/branches/flume-728:
flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/
flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/
flume-ng-channels/f...
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/JsonFileConfigurationProvider.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,7 @@ package org.apache.flume.conf.file;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -51,16 +52,27 @@ public class JsonFileConfigurationProvid
if (sourceDef.containsKey("type")) {
Source source =
- getSourceFactory().create((String) sourceDef.get("type"));
- Channel channel = conf.getChannels().get(sourceDef.get("channel"));
+ getSourceFactory().create(
+ (String) sourceDef.get("name"),
+ (String) sourceDef.get("type"));
+
+ //String channelList = sourceDef.
+
+ List<String> channelNames = (List<String>) sourceDef.get("channels");
+
+ List<Channel> channels = new ArrayList<Channel>();
+
+ for (String chName : channelNames) {
+ channels.add(conf.getChannels().get(chName));
+ }
Context context = new Context();
context.setParameters(sourceDef);
Configurables.configure(source, context);
- if (channel != null) {
- source.setChannel(channel);
+ if (channels.size() > 0) {
+ source.setChannels(channels);
} else {
logger.warn(
"No channel named {} - source:{} is likely non-functional.",
@@ -86,7 +98,9 @@ public class JsonFileConfigurationProvid
if (sinkDef.containsKey("type")) {
Sink sink =
- getSinkFactory().create((String) sinkDef.get("type"));
+ getSinkFactory().create(
+ (String) sinkDef.get("name"),
+ (String) sinkDef.get("type"));
Channel channel = conf.getChannels().get(sinkDef.get("channel"));
Context context = new Context();
@@ -117,8 +131,9 @@ public class JsonFileConfigurationProvid
logger.debug("channel:{}", channelDef);
if (channelDef.containsKey("type")) {
- Channel channel = getChannelFactory()
- .create((String) channelDef.get("type"));
+ Channel channel = getChannelFactory().create(
+ (String) channelDef.get("name"),
+ (String) channelDef.get("type"));
Context context = new Context();
context.setParameters(channelDef);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java Fri Jan 13 22:57:41 2012
@@ -21,9 +21,11 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
-import java.util.StringTokenizer;
import org.apache.flume.Channel;
import org.apache.flume.Context;
@@ -31,7 +33,6 @@ import org.apache.flume.Sink;
import org.apache.flume.SinkRunner;
import org.apache.flume.Source;
import org.apache.flume.SourceRunner;
-import org.apache.flume.channel.FanoutChannel;
import org.apache.flume.conf.Configurables;
import org.apache.flume.conf.file.AbstractFileConfigurationProvider;
import org.apache.flume.conf.file.SimpleNodeConfiguration;
@@ -74,10 +75,10 @@ import org.slf4j.LoggerFactory;
* <tt><agent name>.sources.<source name>.runner.type = avro</tt>.
* This namespace can also be used to configure other configuration of the
* source runner as needed. For example:
- * <tt><agent name>.sources.<source name>.runner.port = 10101</tt></li>
- * <li>For each channel named in the <tt><agent name>.channels</tt>, there
- * must be a non-empty <tt>type</tt> attribute specified from the valid set of
- * channel types. For example:
+ * <tt><agent name>.sources.<source name>.runner.port = 10101</tt>
+ * </li><li>For each channel named in the <tt><agent name>.channels</tt>,
+ * there must be a non-empty <tt>type</tt> attribute specified from the valid
+ * set of channel types. For example:
* <tt><agent name>.channels.<channel name>.type = mem</tt></li>
* <li>For each sink named in the <tt><agent name>.sinks</tt>, there must
* be a non-empty <tt>type</tt> attribute specified from the valid set of sink
@@ -98,7 +99,7 @@ import org.slf4j.LoggerFactory;
* <tt><agent name>.sinks.<sink name>.runner.polling.interval =
* 60</tt></li>
* </ul>
- *
+ *
* Apart from the above required configuration values, each source, sink or
* channel can have its own set of arbitrary configuration as required by the
* implementation. Each of these configuration values are expressed by fully
@@ -120,36 +121,36 @@ import org.slf4j.LoggerFactory;
* </p>
* <p>
* Example configuration file:
- *
+ *
* <pre>
* #
* # Flume Configuration
* # This file contains configuration for one Agent identified as host1.
* #
- *
+ *
* host1.sources = avroSource thriftSource
* host1.channels = jdbcChannel
* host1.sinks = hdfsSink
- *
+ *
* # avroSource configuration
* host1.sources.avroSource.type = org.apache.flume.source.AvroSource
* host1.sources.avroSource.runner.type = avro
* host1.sources.avroSource.runner.port = 11001
* host1.sources.avroSource.channels = jdbcChannel
- *
+ *
* # thriftSource configuration
* host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
* host1.sources.thriftSource.runner.type = thrift
* host1.sources.thriftSource.runner.port = 12001
* host1.sources.thriftSource.channels = jdbcChannel
- *
+ *
* # jdbcChannel configuration
* host1.channels.jdbcChannel.type = jdbc
* host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
* host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
* host1.channels.jdbcChannel.jdbc.username = flume
* host1.channels.jdbcChannel.jdbc.password = flume
- *
+ *
* # hdfsSink configuration
* host1.sinks.hdfsSink.type = hdfs
* host1.sinks.hdfsSink.namenode = hdfs://localhost/
@@ -157,9 +158,9 @@ import org.slf4j.LoggerFactory;
* host1.sinks.hdfsSink.runner.type = polling
* host1.sinks.hdfsSink.runner.polling.interval = 60
* </pre>
- *
+ *
* </p>
- *
+ *
* @see java.util.Properties#load(java.io.Reader)
*/
public class PropertiesFileConfigurationProvider extends
@@ -209,13 +210,13 @@ public class PropertiesFileConfiguration
}
}
- private void loadChannels(AgentConfiguration agentConf, NodeConfiguration conf)
- throws InstantiationException {
+ private void loadChannels(AgentConfiguration agentConf,
+ NodeConfiguration conf) throws InstantiationException {
for (ComponentConfiguration comp : agentConf.getChannels()) {
Context context = new Context();
- Channel channel = getChannelFactory().create(
+ Channel channel = getChannelFactory().create(comp.getComponentName(),
comp.getConfiguration().get("type"));
for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
@@ -234,8 +235,10 @@ public class PropertiesFileConfiguration
for (ComponentConfiguration comp : agentConf.getSources()) {
Context context = new Context();
- Source source = getSourceFactory().create(
- comp.getConfiguration().get("type"));
+ Map<String, String> componentConfig = comp.getConfiguration();
+
+ Source source = getSourceFactory().create(comp.getComponentName(),
+ componentConfig.get("type"));
for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
context.put(entry.getKey(), entry.getValue());
@@ -243,14 +246,14 @@ public class PropertiesFileConfiguration
Configurables.configure(source, context);
- if (comp.getConfiguration().get("channels").contains(",")) {
- // then create a fanout if there are channel for this source
- source.setChannel(getChannelFactory().createFanout(
- comp.getConfiguration().get("channels"),conf.getChannels()));
- } else {
- source.setChannel(conf.getChannels().get(
- comp.getConfiguration().get("channels")));
+ String channelNames = comp.getConfiguration().get("channels");
+ List<Channel> channels = new ArrayList<Channel>();
+
+ for (String chName : channelNames.split(" ")) {
+ channels.add(conf.getChannels().get(chName));
}
+
+ source.setChannels(channels);
conf.getSourceRunners().put(comp.getComponentName(),
SourceRunner.forSource(source));
}
@@ -261,13 +264,11 @@ public class PropertiesFileConfiguration
for (ComponentConfiguration comp : agentConf.getSinks()) {
Context context = new Context();
+ Map<String, String> componentConfig = comp.getConfiguration();
- String type = comp.getConfiguration().get("type");
- Sink sink = getSinkFactory().create(type);
- if(sink == null) {
- throw new InstantiationException("Can't instantiate sink with type " + type + " (it's probably " +
- "unknown type)");
- }
+
+ Sink sink = getSinkFactory().create(comp.getComponentName(),
+ componentConfig.get("type"));
for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
context.put(entry.getKey(), entry.getValue());
@@ -276,7 +277,7 @@ public class PropertiesFileConfiguration
Configurables.configure(sink, context);
sink.setChannel(conf.getChannels().get(
- comp.getConfiguration().get("channel")));
+ componentConfig.get("channel")));
conf.getSinkRunners().put(comp.getComponentName(),
SinkRunner.forSink(sink));
}
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Fri Jan 13 22:57:41 2012
@@ -78,8 +78,6 @@ public class Application {
application.setArgs(args);
try {
- application.loadPlugins();
-
if (application.parseOptions()) {
application.run();
}
@@ -97,23 +95,6 @@ public class Application {
channelFactory = new DefaultChannelFactory();
}
- public void loadPlugins() {
- channelFactory.register("memory", MemoryChannel.class);
- channelFactory.register("jdbc", JdbcChannel.class);
- channelFactory.register("fanout", FanoutChannel.class);
-
- sourceFactory.register("seq", SequenceGeneratorSource.class);
- sourceFactory.register("netcat", NetcatSource.class);
- sourceFactory.register("exec", ExecSource.class);
- sourceFactory.register("avro", AvroSource.class);
-
- sinkFactory.register("null", NullSink.class);
- sinkFactory.register("logger", LoggerSink.class);
- sinkFactory.register("file-roll", RollingFileSink.class);
- sinkFactory.register("hdfs", HDFSEventSink.class);
- sinkFactory.register("avro", AvroSink.class);
- }
-
public boolean parseOptions() throws ParseException {
Options options = new Options();
@@ -165,7 +146,8 @@ public class Application {
final FlumeNode node = new FlumeNode();
DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
- AbstractFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider();
+ AbstractFileConfigurationProvider configurationProvider =
+ new PropertiesFileConfigurationProvider();
configurationProvider.setChannelFactory(channelFactory);
configurationProvider.setSourceFactory(sourceFactory);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/file/TestJsonFileConfigurationProvider.java Fri Jan 13 22:57:41 2012
@@ -40,6 +40,7 @@ import org.apache.flume.sink.NullSink;
import org.apache.flume.source.DefaultSourceFactory;
import org.apache.flume.source.NetcatSource;
import org.apache.flume.source.SequenceGeneratorSource;
+import org.apache.flume.source.SourceType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -62,14 +63,6 @@ public class TestJsonFileConfigurationPr
SourceFactory sourceFactory = new DefaultSourceFactory();
SinkFactory sinkFactory = new DefaultSinkFactory();
- channelFactory.register("memory", MemoryChannel.class);
-
- sourceFactory.register("seq", SequenceGeneratorSource.class);
- sourceFactory.register("netcat", NetcatSource.class);
-
- sinkFactory.register("null", NullSink.class);
- sinkFactory.register("logger", LoggerSink.class);
-
provider = new JsonFileConfigurationProvider();
provider.setNodeName("localhost");
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractLogicalNodeManager.java Fri Jan 13 22:57:41 2012
@@ -19,6 +19,9 @@
package org.apache.flume.node;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Sink;
@@ -144,7 +147,9 @@ public class TestAbstractLogicalNodeMana
Configurables.configure(channel, new Context());
Source generatorSource = new SequenceGeneratorSource();
- generatorSource.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+ generatorSource.setChannels(channels);
Sink nullSink = new NullSink();
nullSink.setChannel(channel);
@@ -175,7 +180,9 @@ public class TestAbstractLogicalNodeMana
Configurables.configure(channel, new Context());
Source source = new SequenceGeneratorSource();
- source.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+ source.setChannels(channels);
Sink sink = new NullSink();
sink.setChannel(channel);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/node/TestDefaultLogicalNodeManager.java Fri Jan 13 22:57:41 2012
@@ -19,9 +19,12 @@
package org.apache.flume.node;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import org.apache.flume.Channel;
import org.apache.flume.channel.MemoryChannel;
import org.apache.flume.lifecycle.LifecycleAware;
import org.apache.flume.lifecycle.LifecycleController;
@@ -67,7 +70,9 @@ public class TestDefaultLogicalNodeManag
for (int i = 0; i < 3; i++) {
SequenceGeneratorSource source = new SequenceGeneratorSource();
- source.setChannel(new MemoryChannel());
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(new MemoryChannel());
+ source.setChannels(channels);
PollableSourceRunner sourceRunner = new PollableSourceRunner();
sourceRunner.setSource(source);
@@ -91,7 +96,9 @@ public class TestDefaultLogicalNodeManag
for (int i = 0; i < 30; i++) {
SequenceGeneratorSource source = new SequenceGeneratorSource();
- source.setChannel(new MemoryChannel());
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(new MemoryChannel());
+ source.setChannels(channels);
PollableSourceRunner sourceRunner = new PollableSourceRunner();
sourceRunner.setSource(source);
@@ -123,7 +130,9 @@ public class TestDefaultLogicalNodeManag
for (int i = 0; i < 30; i++) {
SequenceGeneratorSource source = new SequenceGeneratorSource();
- source.setChannel(new MemoryChannel());
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(new MemoryChannel());
+ source.setChannels(channels);
PollableSourceRunner sourceRunner = new PollableSourceRunner();
sourceRunner.setSource(source);
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/source/TestNetcatSource.java Fri Jan 13 22:57:41 2012
@@ -24,6 +24,8 @@ import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.channels.Channels;
import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -55,8 +57,9 @@ public class TestNetcatSource {
context.put("capacity", "50");
Configurables.configure(channel, context);
-
- source.setChannel(channel);
+ List<Channel> channels = new ArrayList<Channel>();
+ channels.add(channel);
+ source.setChannels(channels);
}
@Test
@@ -98,7 +101,7 @@ public class TestNetcatSource {
};
- Transaction tx = source.getChannel().getTransaction();
+ Transaction tx = source.getChannels().get(0).getTransaction();
tx.begin();
for (int i = 0; i < 100; i++) {
Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json?rev=1231371&r1=1231370&r2=1231371&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/resources/flume-conf.json Fri Jan 13 22:57:41 2012
@@ -5,12 +5,12 @@
{
"name": "source1",
"type": "seq",
- "channel": "ch1"
+ "channels": [ "ch1" ]
},
{
"name": "source2",
"type": "seq",
- "channel": "ch2"
+ "channels": [ "ch2" ]
}
],
@@ -18,12 +18,12 @@
{
"name": "sink1",
"type": "null",
- "channel": "ch1"
+ "channels": [ "ch1" ]
},
{
"name": "sink2",
"type": "null",
- "channel": "ch2"
+ "channels": [ "ch2" ]
}
],
@@ -50,19 +50,21 @@
"type": "netcat",
"bind": "0.0.0.0",
"port": "41414",
- "channel": "ch1"
+ "channels": [ "ch1" ]
}
],
"sinks": [
{
+ "name": "sinkx",
"type": "null",
- "channel": "ch1"
+ "channels": [ "ch1" ]
}
],
"channels": [
{
+ "name": "mem-channel",
"type": "memory"
}
]