You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by mp...@apache.org on 2013/01/25 01:27:06 UTC
git commit: FLUME-1852. Issues with EmbeddedAgentConfiguration.
Updated Branches:
refs/heads/trunk d45af178e -> 3df65e12c
FLUME-1852. Issues with EmbeddedAgentConfiguration.
(Brock Noland via Mike Percy)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/3df65e12
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/3df65e12
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/3df65e12
Branch: refs/heads/trunk
Commit: 3df65e12c8d480cd46f190a0bb4addfee4272062
Parents: d45af17
Author: Mike Percy <mp...@apache.org>
Authored: Thu Jan 24 16:26:03 2013 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Thu Jan 24 16:26:33 2013 -0800
----------------------------------------------------------------------
.../agent/embedded/EmbeddedAgentConfiguration.java | 73 ++++++++-------
.../apache/flume/agent/embedded/package-info.java | 2 +-
.../embedded/TestEmbeddedAgentConfiguration.java | 42 ++++++++-
.../embedded/TestEmbeddedAgentEmbeddedSource.java | 1 -
4 files changed, 79 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
index e52f912..6204bc5 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/EmbeddedAgentConfiguration.java
@@ -33,6 +33,7 @@ import org.apache.flume.conf.sink.SinkProcessorType;
import org.apache.flume.conf.sink.SinkType;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
/**
@@ -65,7 +66,7 @@ public class EmbeddedAgentConfiguration {
public static final String SINKS_PREFIX = join(SINKS, "");
/**
- * Source type, choices are `embedded' or `avro'
+ * Source type, choices are `embedded'
*/
public static final String SOURCE_TYPE = join(SOURCE, TYPE);
/**
@@ -81,7 +82,7 @@ public class EmbeddedAgentConfiguration {
*/
public static final String CHANNEL_PREFIX = join(CHANNEL, "");
/**
- * Sink processor type, choices are `default' (failover) or `load_balance'
+ * Sink processor type, choices are `default', `failover' or `load_balance'
*/
public static final String SINK_PROCESSOR_TYPE = join(SINK_PROCESSOR, TYPE);
/**
@@ -90,10 +91,11 @@ public class EmbeddedAgentConfiguration {
public static final String SINK_PROCESSOR_PREFIX = join(SINK_PROCESSOR, "");
/**
* Embedded source which provides simple in-memory transfer to channel.
- * Use this source via the put,pulAll methods on the EmbeddedAgent. This
- * is the recommended source to use for Embedded Agents.
+ * Use this source via the put,putAll methods on the EmbeddedAgent. This
+ * is the only supported source to use for Embedded Agents.
*/
public static final String SOURCE_TYPE_EMBEDDED = EmbeddedSource.class.getName();
+ private static final String SOURCE_TYPE_EMBEDDED_ALIAS = "EMBEDDED";
/**
* Memory channel which stores events in heap. See Flume User Guide for
* configuration information. This is the recommended channel to use for
@@ -101,8 +103,8 @@ public class EmbeddedAgentConfiguration {
*/
public static final String CHANNEL_TYPE_MEMORY = ChannelType.MEMORY.name();
/**
- * File based channel which stores events in heap. See Flume User Guide for
- * configuration information.
+ * File based channel which stores events in on local disk. See Flume User
+ * Guide for configuration information.
*/
public static final String CHANNEL_TYPE_FILE = ChannelType.FILE.name();
@@ -129,6 +131,7 @@ public class EmbeddedAgentConfiguration {
private static final String[] ALLOWED_SOURCES = {
+ SOURCE_TYPE_EMBEDDED_ALIAS,
SOURCE_TYPE_EMBEDDED,
};
@@ -147,6 +150,9 @@ public class EmbeddedAgentConfiguration {
SINK_PROCESSOR_TYPE_LOAD_BALANCE
};
+ private static final ImmutableList<String> DISALLOWED_SINK_NAMES =
+ ImmutableList.of("source", "channel", "processor");
+
private static void validate(String name,
Map<String, String> properties) throws FlumeException {
@@ -158,6 +164,10 @@ public class EmbeddedAgentConfiguration {
checkRequired(properties, SINKS);
String sinkNames = properties.get(SINKS);
for(String sink : sinkNames.split("\\s+")) {
+ if(DISALLOWED_SINK_NAMES.contains(sink.toLowerCase())) {
+ throw new FlumeException("Sink name " + sink + " is one of the" +
+ " disallowed sink names: " + DISALLOWED_SINK_NAMES);
+ }
String key = join(sink, TYPE);
checkRequired(properties, key);
checkAllowed(ALLOWED_SINKS, properties.get(key));
@@ -182,7 +192,8 @@ public class EmbeddedAgentConfiguration {
// we are going to modify the properties as we parse the config
properties = new HashMap<String, String>(properties);
- if(!properties.containsKey(SOURCE_TYPE)) {
+ if(!properties.containsKey(SOURCE_TYPE) || SOURCE_TYPE_EMBEDDED_ALIAS.
+ equalsIgnoreCase(properties.get(SOURCE_TYPE))) {
properties.put(SOURCE_TYPE, SOURCE_TYPE_EMBEDDED);
}
String sinkNames = properties.remove(SINKS);
@@ -199,7 +210,6 @@ public class EmbeddedAgentConfiguration {
// user supplied config -> agent configuration
Map<String, String> result = Maps.newHashMap();
- Joiner joiner = Joiner.on(SEPERATOR);
// properties will be modified during iteration so we need a
// copy of the keys
Set<String> userProvidedKeys;
@@ -209,42 +219,40 @@ public class EmbeddedAgentConfiguration {
* source at the channel.
*/
// point agent at source
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SOURCES), sourceName);
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SOURCES),
+ sourceName);
// point agent at channel
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
- // point agent at source
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKS), sinkNames);
+ result.put(join(name, BasicConfigurationConstants.CONFIG_CHANNELS),
+ channelName);
+ // point agent at sinks
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKS),
+ sinkNames);
// points the agent at the sinkgroup
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS),
sinkGroupName);
// points the sinkgroup at the sinks
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
sinkGroupName, SINKS), sinkNames);
// points the source at the channel
- result.put(joiner.join(name,
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SOURCES, sourceName,
BasicConfigurationConstants.CONFIG_CHANNELS), channelName);
/*
- * Second process the the sink configuration and point the sinks
+ * Second process the sink configuration and point the sinks
* at the channel.
*/
userProvidedKeys = new HashSet<String>(properties.keySet());
for(String sink : sinkNames.split("\\s+")) {
for(String key : userProvidedKeys) {
String value = properties.get(key);
- if(key.startsWith(sink)) {
+ if(key.startsWith(sink + SEPERATOR)) {
properties.remove(key);
- result.put(joiner.join(name,
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SINKS, key), value);
}
}
// point the sink at the channel
- result.put(joiner.join(name,
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SINKS, sink,
BasicConfigurationConstants.CONFIG_CHANNEL), channelName);
}
@@ -255,20 +263,19 @@ public class EmbeddedAgentConfiguration {
userProvidedKeys = new HashSet<String>(properties.keySet());
for(String key : userProvidedKeys) {
String value = properties.get(key);
- if(key.startsWith(SOURCE)) {
+ if(key.startsWith(SOURCE_PREFIX)) {
// users use `source' but agent needs the actual source name
- key = key.replace(SOURCE, sourceName);
- result.put(joiner.join(name,
+ key = key.replaceFirst(SOURCE, sourceName);
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_SOURCES, key), value);
- } else if(key.startsWith(CHANNEL)) {
+ } else if(key.startsWith(CHANNEL_PREFIX)) {
// users use `channel' but agent needs the actual channel name
- key = key.replace(CHANNEL, channelName);
- result.put(joiner.join(name,
+ key = key.replaceFirst(CHANNEL, channelName);
+ result.put(join(name,
BasicConfigurationConstants.CONFIG_CHANNELS, key), value);
- } else if(key.startsWith(SINK_PROCESSOR)) {
+ } else if(key.startsWith(SINK_PROCESSOR_PREFIX)) {
// agent.sinkgroups.sinkgroup.processor.*
- result.put(joiner.
- join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
+ result.put(join(name, BasicConfigurationConstants.CONFIG_SINKGROUPS,
sinkGroupName, key), value);
} else {
// XXX should we simply ignore this?
http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
index 0a53c5f..919a630 100644
--- a/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
+++ b/flume-ng-embedded-agent/src/main/java/org/apache/flume/agent/embedded/package-info.java
@@ -19,6 +19,6 @@
/**
* This package provides Flume users the ability to embed simple agents
* in applications. For specific and up to date information, please see
- * the Flume User Guide.
+ * the Flume Developer Guide.
*/
package org.apache.flume.agent.embedded;
http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
index 3805ea8..f70d0b1 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentConfiguration.java
@@ -47,10 +47,30 @@ public class TestEmbeddedAgentConfiguration {
properties.put("sink2.port", "2");
properties.put("processor.type", "load_balance");
}
+
+
+ @Test
+ public void testFullSourceType() throws Exception {
+ doTestExcepted(EmbeddedAgentConfiguration.
+ configure("test1", properties));
+ }
+
@Test
- public void testBasic() throws Exception {
- Map<String, String> actual = EmbeddedAgentConfiguration.
- configure("test1", properties);
+ public void testMissingSourceType() throws Exception {
+ Assert.assertNotNull(properties.remove("source.type"));
+ doTestExcepted(EmbeddedAgentConfiguration.
+ configure("test1", properties));
+ }
+
+ @Test
+ public void testShortSourceType() throws Exception {
+ properties.put("source.type", "EMBEDDED");
+ doTestExcepted(EmbeddedAgentConfiguration.
+ configure("test1", properties));
+ }
+
+
+ public void doTestExcepted(Map<String, String> actual) throws Exception {
Map<String, String> expected = Maps.newHashMap();
expected.put("test1.channels", "channel-test1");
expected.put("test1.channels.channel-test1.capacity", "200");
@@ -71,7 +91,6 @@ public class TestEmbeddedAgentConfiguration {
expected.put("test1.sources.source-test1.channels", "channel-test1");
expected.put("test1.sources.source-test1.type", EmbeddedAgentConfiguration.
SOURCE_TYPE_EMBEDDED);
-
Assert.assertEquals(expected, actual);
}
@@ -116,4 +135,19 @@ public class TestEmbeddedAgentConfiguration {
properties.put("bad.key.name", "bad");
EmbeddedAgentConfiguration.configure("test1", properties);
}
+ @Test(expected = FlumeException.class)
+ public void testSinkNamedLikeSource() throws Exception {
+ properties.put("sinks", "source");
+ EmbeddedAgentConfiguration.configure("test1", properties);
+ }
+ @Test(expected = FlumeException.class)
+ public void testSinkNamedLikeChannel() throws Exception {
+ properties.put("sinks", "channel");
+ EmbeddedAgentConfiguration.configure("test1", properties);
+ }
+ @Test(expected = FlumeException.class)
+ public void testSinkNamedLikeProcessor() throws Exception {
+ properties.put("sinks", "processor");
+ EmbeddedAgentConfiguration.configure("test1", properties);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/3df65e12/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
----------------------------------------------------------------------
diff --git a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
index 4e94d72..9d85e6e 100644
--- a/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
+++ b/flume-ng-embedded-agent/src/test/java/org/apache/flume/agent/embedded/TestEmbeddedAgentEmbeddedSource.java
@@ -28,7 +28,6 @@ import junit.framework.Assert;
import org.apache.flume.Channel;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
-import org.apache.flume.FlumeException;
import org.apache.flume.SinkRunner;
import org.apache.flume.SourceRunner;
import org.apache.flume.event.SimpleEvent;