You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/02/29 05:53:23 UTC
svn commit: r1294969 - in /incubator/flume/branches/flume-728:
flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/
flume-ng-core/src/main/java/org/apache/flume/
flume-ng-core/src/main/java/org/apache/flume/channel/ fl...
Author: arvind
Date: Wed Feb 29 04:53:22 2012
New Revision: 1294969
URL: http://svn.apache.org/viewvc?rev=1294969&view=rev
Log:
FLUME-978. Context interface is too basic requiring boilerplate user code.
(Brock Noland via Arvind Prabhakar)
Modified:
incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Wed Feb 29 04:53:22 2012
@@ -103,16 +103,14 @@ public class JdbcChannelProviderImpl imp
}
private void initializeSystemProperties(Context context) {
- Map<String, Object> sysProps = context.getSubProperties(
+ Map<String, String> sysProps = context.getSubProperties(
ConfigurationConstants.CONFIG_JDBC_SYSPRO_PREFIX);
for (String key: sysProps.keySet()) {
- Object object = sysProps.get(key);
- String value = "";
- if (object != null) {
- value = object.toString();
+ String value = sysProps.get(key);
+ if(key != null && value != null) {
+ System.setProperty(key, value);
}
- System.setProperty(key, value);
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java Wed Feb 29 04:53:22 2012
@@ -19,73 +19,179 @@
package org.apache.flume;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
public class Context {
- private Map<String, Object> parameters;
+ private Map<String, String> parameters;
public Context() {
- parameters = new HashMap<String, Object>();
+ parameters = Collections.synchronizedMap(new HashMap<String, String>());
}
- public Map<String, Object> getSubProperties(String prefix) {
- Map<String, Object> result = new HashMap<String, Object>();
+ public Context(Map<String, String> paramters) {
+ this();
+ this.putAll(paramters);
+ }
- for (String key : parameters.keySet()) {
- if (key.startsWith(prefix)) {
- String name = key.substring(prefix.length());
- result.put(name, parameters.get(key));
- }
+ /**
+ * Gets a copy of the backing map structure.
+ * @return immutable copy of backing map structure
+ */
+ public ImmutableMap<String, String> getParameters() {
+ synchronized (parameters) {
+ return ImmutableMap.copyOf(parameters);
}
-
- return result;
+ }
+ /**
+ * Removes all of the mappings from this map.
+ */
+ public void clear() {
+ parameters.clear();
}
- public void put(String key, Object value) {
+ /**
+ * Get properties which start with a prefix. When a property is returned,
+ * the prefix is removed the from name. For example, if this method is
+ * called with a parameter "hdfs." and the context contains:
+ * <code>
+ * { hdfs.key = value, otherKey = otherValue }
+ * </code>
+ * this method will return a map containing:
+ * <code>
+ * { key = value}
+ * </code>
+ *
+ * @param prefix key prefix to find and remove from keys in resulting map
+ * @return map with keys which matched prefix with prefix removed from
+ * keys in resulting map. If no keys are matched, the returned map is
+ * empty
+ */
+ public ImmutableMap<String, String> getSubProperties(String prefix) {
+ Map<String, String> result = Maps.newHashMap();
+ synchronized(parameters) {
+ for (String key : parameters.keySet()) {
+ if (key.startsWith(prefix)) {
+ String name = key.substring(prefix.length());
+ result.put(name, parameters.get(key));
+ }
+ }
+ }
+ return ImmutableMap.copyOf(result);
+ }
+ /**
+ * Associates all of the given map's keys and values in the Context.
+ */
+ public void putAll(Map<String, String> map) {
+ parameters.putAll(map);
+ }
+ /**
+ * Associates the specified value with the specified key in this context.
+ * If the context previously contained a mapping for the key, the old value
+ * is replaced by the specified value.
+ * @param key key with which the specified value is to be associated
+ * @param value to be associated with the specified key
+ */
+ public void put(String key, String value) {
parameters.put(key, value);
}
-
- public <T> T get(String key, Class<? extends T> clazz) {
- if (parameters.containsKey(key)) {
- return clazz.cast(parameters.get(key));
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public Boolean getBoolean(String key, Boolean defaultValue) {
+ String value = get(key);
+ if(value != null) {
+ return Boolean.parseBoolean(value.trim());
}
-
- return null;
+ return defaultValue;
}
-
- public <T> T get(String key, Class<? extends T> clazz, T defaultValue) {
- T result = get(key, clazz);
- if (result == null) {
- result = defaultValue;
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public Boolean getBoolean(String key) {
+ return getBoolean(key, null);
+ }
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public Integer getInteger(String key, Integer defaultValue) {
+ String value = get(key);
+ if(value != null) {
+ return Integer.parseInt(value.trim());
}
-
- return result;
+ return defaultValue;
}
-
- public String getString(String key) {
- return get(key, String.class);
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public Integer getInteger(String key) {
+ return getInteger(key, null);
+ }
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
+ public Long getLong(String key, Long defaultValue) {
+ String value = get(key);
+ if(value != null) {
+ return Long.parseLong(value.trim());
+ }
+ return defaultValue;
}
-
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public Long getLong(String key) {
+ return getLong(key, null);
+ }
+ /**
+ * Gets value mapped to key, returning defaultValue if unmapped.
+ * @param key to be found
+ * @param defaultValue returned if key is unmapped
+ * @return value associated with key
+ */
public String getString(String key, String defaultValue) {
- return get(key, String.class, defaultValue);
+ return get(key, defaultValue);
}
-
- @Override
- public String toString() {
- return "{ parameters:" + parameters + " }";
+ /**
+ * Gets value mapped to key, returning null if unmapped.
+ * @param key to be found
+ * @return value associated with key or null if unmapped
+ */
+ public String getString(String key) {
+ return get(key);
}
-
- public Map<String, Object> getParameters() {
- return parameters;
+ private String get(String key, String defaultValue) {
+ String result = parameters.get(key);
+ if(result != null) {
+ return result;
+ }
+ return defaultValue;
}
-
- public void setParameters(Map<String, Object> parameters) {
- this.parameters = parameters;
+ private String get(String key) {
+ return get(key, null);
}
-
- public void clear() {
- parameters.clear();
+ @Override
+ public String toString() {
+ return "{ parameters:" + parameters + " }";
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java Wed Feb 29 04:53:22 2012
@@ -18,7 +18,6 @@
*/
package org.apache.flume.channel;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -42,11 +41,8 @@ public class ChannelSelectorFactory {
selector.setChannels(channels);
- Map<String, Object> params = new HashMap<String, Object>();
- params.putAll(config);
-
Context context = new Context();
- context.setParameters(params);
+ context.putAll(config);
Configurables.configure(selector, context);
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Wed Feb 29 04:53:22 2012
@@ -171,7 +171,7 @@ public class MemoryChannel extends Basic
}
Preconditions.checkState(transCapacity <= capacity);
- String strKeepAlive = context.get("keep-alive", String.class);
+ String strKeepAlive = context.getString("keep-alive");
if (strKeepAlive == null) {
keepAlive = defaultKeepAlive;
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java Wed Feb 29 04:53:22 2012
@@ -91,14 +91,14 @@ public class MultiplexingChannelSelector
throw new FlumeException("Default channel list empty");
}
- Map<String, Object> mapConfig = context.getSubProperties(
+ Map<String, String> mapConfig = context.getSubProperties(
CONFIG_PREFIX_MAPPING + ".");
channelMapping = new HashMap<String, List<Channel>>();
for (String headerValue : mapConfig.keySet()) {
List<Channel> configuredChannels = getChannelListFromNames(
- (String) mapConfig.get(headerValue),
+ mapConfig.get(headerValue),
channelNameMap);
//This should not go to default channel(s)
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java Wed Feb 29 04:53:22 2012
@@ -86,8 +86,8 @@ public class PseudoTxnMemoryChannel exte
@Override
public void configure(Context context) {
- Integer capacity = context.get("capacity", Integer.class);
- keepAlive = context.get("keep-alive", Integer.class);
+ Integer capacity = context.getInteger("capacity");
+ keepAlive = context.getInteger("keep-alive");
if (capacity == null) {
capacity = defaultCapacity;
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Wed Feb 29 04:53:22 2012
@@ -130,9 +130,9 @@ public class AvroSink extends AbstractSi
@Override
public void configure(Context context) {
- hostname = context.get("hostname", String.class);
- port = Integer.parseInt(context.get("port", String.class));
- batchSize = Integer.parseInt(context.get("batch-size", String.class));
+ hostname = context.getString("hostname");
+ port = context.getInteger("port");
+ batchSize = context.getInteger("batch-size");
if (batchSize == null) {
batchSize = defaultBatchSize;
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Wed Feb 29 04:53:22 2012
@@ -68,8 +68,8 @@ public class RollingFileSink extends Abs
@Override
public void configure(Context context) {
- String directory = context.get("sink.directory", String.class);
- String rollInterval = context.get("sink.rollInterval", String.class);
+ String directory = context.getString("sink.directory");
+ String rollInterval = context.getString("sink.rollInterval");
Preconditions.checkArgument(directory != null, "Directory may not be null");
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java Wed Feb 29 04:53:22 2012
@@ -41,8 +41,8 @@ public class SinkGroup implements Config
@Override
public void configure(Context context) {
Context processorContext = new Context();
- Map<String, Object> subparams = context.getSubProperties(PROCESSOR_PREFIX);
- processorContext.setParameters(subparams);
+ Map<String, String> subparams = context.getSubProperties(PROCESSOR_PREFIX);
+ processorContext.putAll(subparams);
processor = SinkProcessorFactory.getProcessor(processorContext, sinks);
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java Wed Feb 29 04:53:22 2012
@@ -37,7 +37,7 @@ public class SinkProcessorFactory {
@SuppressWarnings("unchecked")
public static SinkProcessor getProcessor(Context context,
List<Sink> sinks) {
- Map<String, Object> params = context.getParameters();
+ Map<String, String> params = context.getParameters();
SinkProcessor processor;
String typeStr = (String) params.get(TYPE);
SinkProcessorType type = SinkProcessorType.DEFAULT;
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Wed Feb 29 04:53:22 2012
@@ -113,8 +113,8 @@ public class AvroSource extends Abstract
@Override
public void configure(Context context) {
- port = Integer.parseInt(context.get("port", String.class));
- bindAddress = context.get("bind", String.class);
+ port = Integer.parseInt(context.getString("port"));
+ bindAddress = context.getString("bind");
}
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Wed Feb 29 04:53:22 2012
@@ -175,7 +175,7 @@ public class ExecSource extends Abstract
@Override
public void configure(Context context) {
- command = context.get("command", String.class);
+ command = context.getString("command");
Preconditions.checkState(command != null,
"The parameter command must be specified");
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Wed Feb 29 04:53:22 2012
@@ -121,8 +121,8 @@ public class NetcatSource extends Abstra
public void configure(Context context) {
Configurables.ensureRequiredNonNull(context, "bind", "port");
- hostName = context.get("bind", String.class);
- port = Integer.parseInt(context.get("port", String.class));
+ hostName = context.getString("bind");
+ port = Integer.parseInt(context.getString("port"));
}
@Override
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java Wed Feb 29 04:53:22 2012
@@ -19,10 +19,13 @@
package org.apache.flume;
-import org.junit.Assert;
+import static org.junit.Assert.*;
+
import org.junit.Before;
import org.junit.Test;
+import com.google.common.collect.ImmutableMap;
+
public class TestContext {
private Context context;
@@ -34,12 +37,52 @@ public class TestContext {
@Test
public void testPutGet() {
- Assert.assertEquals("Context is empty", 0, context.getParameters().size());
+ assertEquals("Context is empty", 0, context.getParameters().size());
+
+ context.put("test", "value");
+ assertEquals("value", context.getString("test"));
+ context.clear();
+ assertNull(context.getString("test"));
+ assertEquals("value", context.getString("test", "value"));
+
+ context.put("test", "true");
+ assertEquals(new Boolean(true), context.getBoolean("test"));
+ context.clear();
+ assertNull(context.getBoolean("test"));
+ assertEquals(new Boolean(true), context.getBoolean("test", true));
+
+ context.put("test", "1");
+ assertEquals(new Integer(1), context.getInteger("test"));
+ context.clear();
+ assertNull(context.getInteger("test"));
+ assertEquals(new Integer(1), context.getInteger("test", 1));
+
+ context.put("test", String.valueOf(Long.MAX_VALUE));
+ assertEquals(new Long(Long.MAX_VALUE), context.getLong("test"));
+ context.clear();
+ assertNull(context.getLong("test"));
+ assertEquals(new Long(Long.MAX_VALUE), context.getLong("test", Long.MAX_VALUE));
- context.put("test", "test");
+ }
+
+ @Test
+ public void testSubProperties() {
+ context.put("my.key", "1");
+ context.put("otherKey", "otherValue");
+ assertEquals(ImmutableMap.of("key", "1"), context.getSubProperties("my."));
- Assert.assertEquals("Context contains test value", "test",
- context.get("test", String.class));
}
+ @Test
+ public void testClear() {
+ context.put("test", "1");
+ context.clear();
+ assertNull(context.getInteger("test"));
+ }
+
+ @Test
+ public void testPutAll() {
+ context.putAll(ImmutableMap.of("test", "1"));
+ assertEquals("1", context.getString("test"));
+ }
}
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java Wed Feb 29 04:53:22 2012
@@ -70,10 +70,10 @@ public class TestMemoryChannel {
@Test
public void testChannelResize() {
Context context = new Context();
- Map<String, Object> parms = new HashMap<String, Object>();
+ Map<String, String> parms = new HashMap<String, String>();
parms.put("capacity", "5");
parms.put("transactionCapacity", "5");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
Transaction transaction = channel.getTransaction();
@@ -105,7 +105,7 @@ public class TestMemoryChannel {
* Reconfigure capacity down and add another event, shouldn't result in exception
*/
parms.put("capacity", "6");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
transaction = channel.getTransaction();
transaction.begin();
@@ -119,7 +119,7 @@ public class TestMemoryChannel {
*/
parms.put("capacity", "2");
parms.put("transactionCapacity", "2");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
for(int i=0; i < 6; i++) {
transaction = channel.getTransaction();
@@ -133,10 +133,10 @@ public class TestMemoryChannel {
@Test(expected=ChannelException.class)
public void testTransactionPutCapacityOverload() {
Context context = new Context();
- Map<String, Object> parms = new HashMap<String, Object>();
+ Map<String, String> parms = new HashMap<String, String>();
parms.put("capacity", "5");
parms.put("transactionCapacity", "2");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
Transaction transaction = channel.getTransaction();
@@ -151,10 +151,10 @@ public class TestMemoryChannel {
@Test(expected=ChannelException.class)
public void testCapacityOverload() {
Context context = new Context();
- Map<String, Object> parms = new HashMap<String, Object>();
+ Map<String, String> parms = new HashMap<String, String>();
parms.put("capacity", "5");
parms.put("transactionCapacity", "3");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
Transaction transaction = channel.getTransaction();
@@ -178,10 +178,10 @@ public class TestMemoryChannel {
@Test
public void testBufferEmptyingAfterTakeCommit() {
Context context = new Context();
- Map<String, Object> parms = new HashMap<String, Object>();
+ Map<String, String> parms = new HashMap<String, String>();
parms.put("capacity", "3");
parms.put("transactionCapacity", "3");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
Transaction tx = channel.getTransaction();
@@ -210,10 +210,10 @@ public class TestMemoryChannel {
@Test
public void testBufferEmptyingAfterRollback() {
Context context = new Context();
- Map<String, Object> parms = new HashMap<String, Object>();
+ Map<String, String> parms = new HashMap<String, String>();
parms.put("capacity", "3");
parms.put("transactionCapacity", "3");
- context.setParameters(parms);
+ context.putAll(parms);
Configurables.configure(channel, context);
Transaction tx = channel.getTransaction();
Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java Wed Feb 29 04:53:22 2012
@@ -153,13 +153,13 @@ public class TestFailoverSinkProcessor {
sinks.add(s2);
sinks.add(s3);
SinkGroup group = new SinkGroup(sinks);
- Map<String, Object> params = new HashMap<String, Object>();
+ Map<String, String> params = new HashMap<String, String>();
params.put("sinks", "s1 s2 s3");
params.put("processor.type", "failover");
params.put("processor.priority.s1", "3");
params.put("processor.priority.s2", "2");
params.put("processor.priority.s3", "1");
- context.setParameters(params);
+ context.putAll(params);
Configurables.configure(group, context);
SinkRunner runner = new SinkRunner(group.getProcessor());
runner.start();
Modified: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java Wed Feb 29 04:53:22 2012
@@ -149,8 +149,8 @@ public class AvroLegacySource extends Ab
@Override
public void configure(Context context) {
- port = Integer.parseInt(context.get("port", String.class));
- host = context.get("host", String.class);
+ port = Integer.parseInt(context.getString("port"));
+ host = context.getString("host");
}
}
Modified: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java Wed Feb 29 04:53:22 2012
@@ -116,8 +116,8 @@ public class ThriftLegacySource extends
@Override
public void configure(Context context) {
- port = Integer.parseInt(context.get("port", String.class));
- host = context.get("host", String.class);
+ port = Integer.parseInt(context.getString("port"));
+ host = context.getString("host");
}
public ThriftLegacySource() {
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Feb 29 04:53:22 2012
@@ -126,18 +126,18 @@ public class HDFSEventSink extends Abstr
// read configuration and setup thresholds
@Override
public void configure(Context context) {
- String dirpath = context.get("hdfs.path", String.class);
- String fileName = context.get("hdfs.filePrefix", String.class);
- String rollInterval = context.get("hdfs.rollInterval", String.class);
- String rollSize = context.get("hdfs.rollSize", String.class);
- String rollCount = context.get("hdfs.rollCount", String.class);
- String batchSize = context.get("hdfs.batchSize", String.class);
- String txnEventMax = context.get("hdfs.txnEventMax", String.class);
- String codecName = context.get("hdfs.codeC", String.class);
- String fileType = context.get("hdfs.fileType", String.class);
- String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
- String writeFormat = context.get("hdfs.writeFormat", String.class);
- String appendTimeout = context.get("hdfs.appendTimeout", String.class);
+ String dirpath = context.getString("hdfs.path");
+ String fileName = context.getString("hdfs.filePrefix");
+ String rollInterval = context.getString("hdfs.rollInterval");
+ String rollSize = context.getString("hdfs.rollSize");
+ String rollCount = context.getString("hdfs.rollCount");
+ String batchSize = context.getString("hdfs.batchSize");
+ String txnEventMax = context.getString("hdfs.txnEventMax");
+ String codecName = context.getString("hdfs.codeC");
+ String fileType = context.getString("hdfs.fileType");
+ String maxOpenFiles = context.getString("hdfs.maxOpenFiles");
+ String writeFormat = context.getString("hdfs.writeFormat");
+ String appendTimeout = context.getString("hdfs.appendTimeout");
if (fileName == null)
fileName = defaultFileName;
Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java Wed Feb 29 04:53:22 2012
@@ -125,18 +125,18 @@ public class IRCSink extends AbstractSin
}
public void configure(Context context) {
- hostname = context.get("hostname", String.class);
- String portStr = context.get("port", String.class);
- nick = context.get("nick", String.class);
- password = context.get("password", String.class);
- user = context.get("user", String.class);
- name = context.get("name", String.class);
- chan = context.get("chan", String.class);
- splitLines = Boolean.parseBoolean(context.get("splitlines", String.class));
- splitChars = context.get("splitchars", String.class);
-
+ hostname = context.getString("hostname");
+ String portStr = context.getString("port");
+ nick = context.getString("nick");
+ password = context.getString("password");
+ user = context.getString("user");
+ name = context.getString("name");
+ chan = context.getString("chan");
+ splitLines = context.getBoolean("splitlines");
+ splitChars = context.getString("splitchars");
+
if (portStr != null) {
- port = Integer.parseInt(portStr);
+ port = Integer.parseInt(portStr);
} else {
port = DEFAULT_PORT;
}