You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by ar...@apache.org on 2012/02/29 05:53:23 UTC

svn commit: r1294969 - in /incubator/flume/branches/flume-728: flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/ flume-ng-core/src/main/java/org/apache/flume/ flume-ng-core/src/main/java/org/apache/flume/channel/ fl...

Author: arvind
Date: Wed Feb 29 04:53:22 2012
New Revision: 1294969

URL: http://svn.apache.org/viewvc?rev=1294969&view=rev
Log:
FLUME-978. Context interface is too basic requiring boilerplate user code.

(Brock Noland via Arvind Prabhakar)

Modified:
    incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
    incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
    incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
    incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java

Modified: incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-channels/flume-jdbc-channel/src/main/java/org/apache/flume/channel/jdbc/impl/JdbcChannelProviderImpl.java Wed Feb 29 04:53:22 2012
@@ -103,16 +103,14 @@ public class JdbcChannelProviderImpl imp
   }
 
   private void initializeSystemProperties(Context context) {
-    Map<String, Object> sysProps = context.getSubProperties(
+    Map<String, String> sysProps = context.getSubProperties(
         ConfigurationConstants.CONFIG_JDBC_SYSPRO_PREFIX);
 
     for (String key: sysProps.keySet()) {
-      Object object = sysProps.get(key);
-      String value = "";
-      if (object != null) {
-        value = object.toString();
+      String value = sysProps.get(key);
+      if(key != null && value != null) {
+        System.setProperty(key, value);
       }
-      System.setProperty(key, value);
     }
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/Context.java Wed Feb 29 04:53:22 2012
@@ -19,73 +19,179 @@
 
 package org.apache.flume;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
 public class Context {
 
-  private Map<String, Object> parameters;
+  private Map<String, String> parameters;
 
   public Context() {
-    parameters = new HashMap<String, Object>();
+    parameters = Collections.synchronizedMap(new HashMap<String, String>());
   }
 
-  public Map<String, Object> getSubProperties(String prefix) {
-    Map<String, Object> result = new HashMap<String, Object>();
+  public Context(Map<String, String> paramters) {
+    this();
+    this.putAll(paramters);
+  }
 
-    for (String key : parameters.keySet()) {
-      if (key.startsWith(prefix)) {
-        String name = key.substring(prefix.length());
-        result.put(name, parameters.get(key));
-      }
+  /**
+   * Gets a copy of the backing map structure.
+   * @return immutable copy of backing map structure
+   */
+  public ImmutableMap<String, String> getParameters() {
+    synchronized (parameters) {
+      return ImmutableMap.copyOf(parameters);
     }
-
-    return result;
+  }
+  /**
+   * Removes all of the mappings from this map.
+   */
+  public void clear() {
+    parameters.clear();
   }
 
-  public void put(String key, Object value) {
+  /**
+   * Get properties which start with a prefix. When a property is returned,
+   * the prefix is removed the from name. For example, if this method is
+   * called with a parameter &quot;hdfs.&quot; and the context contains:
+   * <code>
+   * { hdfs.key = value, otherKey = otherValue }
+   * </code>
+   * this method will return a map containing:
+   * <code>
+   * { key = value}
+   * </code>
+   *
+   * @param prefix key prefix to find and remove from keys in resulting map
+   * @return map with keys which matched prefix with prefix removed from
+   *   keys in resulting map. If no keys are matched, the returned map is
+   *   empty
+   */
+  public ImmutableMap<String, String> getSubProperties(String prefix) {
+    Map<String, String> result = Maps.newHashMap();
+    synchronized(parameters) {
+      for (String key : parameters.keySet()) {
+        if (key.startsWith(prefix)) {
+          String name = key.substring(prefix.length());
+          result.put(name, parameters.get(key));
+        }
+      }
+    }
+    return ImmutableMap.copyOf(result);
+  }
+  /**
+   * Associates all of the given map's keys and values in the Context.
+   */
+  public void putAll(Map<String, String> map) {
+    parameters.putAll(map);
+  }
+  /**
+   * Associates the specified value with the specified key in this context.
+   * If the context previously contained a mapping for the key, the old value
+   * is replaced by the specified value.
+   * @param key key with which the specified value is to be associated
+   * @param value to be associated with the specified key
+   */
+  public void put(String key, String value) {
     parameters.put(key, value);
   }
-
-  public <T> T get(String key, Class<? extends T> clazz) {
-    if (parameters.containsKey(key)) {
-      return clazz.cast(parameters.get(key));
+  /**
+   * Gets value mapped to key, returning defaultValue if unmapped.
+   * @param key to be found
+   * @param defaultValue returned if key is unmapped
+   * @return value associated with key
+   */
+  public Boolean getBoolean(String key, Boolean defaultValue) {
+    String value = get(key);
+    if(value != null) {
+      return Boolean.parseBoolean(value.trim());
     }
-
-    return null;
+    return defaultValue;
   }
-
-  public <T> T get(String key, Class<? extends T> clazz, T defaultValue) {
-    T result = get(key, clazz);
-    if (result == null) {
-      result = defaultValue;
+  /**
+   * Gets value mapped to key, returning null if unmapped.
+   * @param key to be found
+   * @return value associated with key or null if unmapped
+   */
+  public Boolean getBoolean(String key) {
+    return getBoolean(key, null);
+  }
+  /**
+   * Gets value mapped to key, returning defaultValue if unmapped.
+   * @param key to be found
+   * @param defaultValue returned if key is unmapped
+   * @return value associated with key
+   */
+  public Integer getInteger(String key, Integer defaultValue) {
+    String value = get(key);
+    if(value != null) {
+      return Integer.parseInt(value.trim());
     }
-
-    return result;
+    return defaultValue;
   }
-
-  public String getString(String key) {
-    return get(key, String.class);
+  /**
+   * Gets value mapped to key, returning null if unmapped.
+   * @param key to be found
+   * @return value associated with key or null if unmapped
+   */
+  public Integer getInteger(String key) {
+    return getInteger(key, null);
+  }
+  /**
+   * Gets value mapped to key, returning defaultValue if unmapped.
+   * @param key to be found
+   * @param defaultValue returned if key is unmapped
+   * @return value associated with key
+   */
+  public Long getLong(String key, Long defaultValue) {
+    String value = get(key);
+    if(value != null) {
+      return Long.parseLong(value.trim());
+    }
+    return defaultValue;
   }
-
+  /**
+   * Gets value mapped to key, returning null if unmapped.
+   * @param key to be found
+   * @return value associated with key or null if unmapped
+   */
+  public Long getLong(String key) {
+    return getLong(key, null);
+  }
+  /**
+   * Gets value mapped to key, returning defaultValue if unmapped.
+   * @param key to be found
+   * @param defaultValue returned if key is unmapped
+   * @return value associated with key
+   */
   public String getString(String key, String defaultValue) {
-    return get(key, String.class, defaultValue);
+    return get(key, defaultValue);
   }
-
-  @Override
-  public String toString() {
-    return "{ parameters:" + parameters + " }";
+  /**
+   * Gets value mapped to key, returning null if unmapped.
+   * @param key to be found
+   * @return value associated with key or null if unmapped
+   */
+  public String getString(String key) {
+    return get(key);
   }
-
-  public Map<String, Object> getParameters() {
-    return parameters;
+  private String get(String key, String defaultValue) {
+    String result = parameters.get(key);
+    if(result != null) {
+      return result;
+    }
+    return defaultValue;
   }
-
-  public void setParameters(Map<String, Object> parameters) {
-    this.parameters = parameters;
+  private String get(String key) {
+    return get(key, null);
   }
-
-  public void clear() {
-    parameters.clear();
+  @Override
+  public String toString() {
+    return "{ parameters:" + parameters + " }";
   }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelSelectorFactory.java Wed Feb 29 04:53:22 2012
@@ -18,7 +18,6 @@
  */
 package org.apache.flume.channel;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -42,11 +41,8 @@ public class ChannelSelectorFactory {
 
     selector.setChannels(channels);
 
-    Map<String, Object> params = new HashMap<String, Object>();
-    params.putAll(config);
-
     Context context = new Context();
-    context.setParameters(params);
+    context.putAll(config);
 
     Configurables.configure(selector, context);
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MemoryChannel.java Wed Feb 29 04:53:22 2012
@@ -171,7 +171,7 @@ public class MemoryChannel extends Basic
     }
     Preconditions.checkState(transCapacity <= capacity);
 
-    String strKeepAlive = context.get("keep-alive", String.class);
+    String strKeepAlive = context.getString("keep-alive");
 
     if (strKeepAlive == null) {
       keepAlive = defaultKeepAlive;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/MultiplexingChannelSelector.java Wed Feb 29 04:53:22 2012
@@ -91,14 +91,14 @@ public class MultiplexingChannelSelector
       throw new FlumeException("Default channel list empty");
     }
 
-    Map<String, Object> mapConfig = context.getSubProperties(
+    Map<String, String> mapConfig = context.getSubProperties(
         CONFIG_PREFIX_MAPPING + ".");
 
     channelMapping = new HashMap<String, List<Channel>>();
 
     for (String headerValue : mapConfig.keySet()) {
       List<Channel> configuredChannels = getChannelListFromNames(
-          (String) mapConfig.get(headerValue),
+          mapConfig.get(headerValue),
           channelNameMap);
 
       //This should not go to default channel(s)

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/channel/PseudoTxnMemoryChannel.java Wed Feb 29 04:53:22 2012
@@ -86,8 +86,8 @@ public class PseudoTxnMemoryChannel exte
 
   @Override
   public void configure(Context context) {
-    Integer capacity = context.get("capacity", Integer.class);
-    keepAlive = context.get("keep-alive", Integer.class);
+    Integer capacity = context.getInteger("capacity");
+    keepAlive = context.getInteger("keep-alive");
 
     if (capacity == null) {
       capacity = defaultCapacity;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Wed Feb 29 04:53:22 2012
@@ -130,9 +130,9 @@ public class AvroSink extends AbstractSi
 
   @Override
   public void configure(Context context) {
-    hostname = context.get("hostname", String.class);
-    port = Integer.parseInt(context.get("port", String.class));
-    batchSize = Integer.parseInt(context.get("batch-size", String.class));
+    hostname = context.getString("hostname");
+    port = context.getInteger("port");
+    batchSize = context.getInteger("batch-size");
 
     if (batchSize == null) {
       batchSize = defaultBatchSize;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java Wed Feb 29 04:53:22 2012
@@ -68,8 +68,8 @@ public class RollingFileSink extends Abs
 
   @Override
   public void configure(Context context) {
-    String directory = context.get("sink.directory", String.class);
-    String rollInterval = context.get("sink.rollInterval", String.class);
+    String directory = context.getString("sink.directory");
+    String rollInterval = context.getString("sink.rollInterval");
 
     Preconditions.checkArgument(directory != null, "Directory may not be null");
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkGroup.java Wed Feb 29 04:53:22 2012
@@ -41,8 +41,8 @@ public class SinkGroup implements Config
   @Override
   public void configure(Context context) {
     Context processorContext = new Context();
-    Map<String, Object> subparams = context.getSubProperties(PROCESSOR_PREFIX);
-    processorContext.setParameters(subparams);
+    Map<String, String> subparams = context.getSubProperties(PROCESSOR_PREFIX);
+    processorContext.putAll(subparams);
     processor = SinkProcessorFactory.getProcessor(processorContext, sinks);
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/SinkProcessorFactory.java Wed Feb 29 04:53:22 2012
@@ -37,7 +37,7 @@ public class SinkProcessorFactory {
   @SuppressWarnings("unchecked")
   public static SinkProcessor getProcessor(Context context,
  List<Sink> sinks) {
-    Map<String, Object> params = context.getParameters();
+    Map<String, String> params = context.getParameters();
     SinkProcessor processor;
     String typeStr = (String) params.get(TYPE);
     SinkProcessorType type = SinkProcessorType.DEFAULT;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Wed Feb 29 04:53:22 2012
@@ -113,8 +113,8 @@ public class AvroSource extends Abstract
 
   @Override
   public void configure(Context context) {
-    port = Integer.parseInt(context.get("port", String.class));
-    bindAddress = context.get("bind", String.class);
+    port = Integer.parseInt(context.getString("port"));
+    bindAddress = context.getString("bind");
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/ExecSource.java Wed Feb 29 04:53:22 2012
@@ -175,7 +175,7 @@ public class ExecSource extends Abstract
 
   @Override
   public void configure(Context context) {
-    command = context.get("command", String.class);
+    command = context.getString("command");
 
     Preconditions.checkState(command != null,
         "The parameter command must be specified");

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Wed Feb 29 04:53:22 2012
@@ -121,8 +121,8 @@ public class NetcatSource extends Abstra
   public void configure(Context context) {
     Configurables.ensureRequiredNonNull(context, "bind", "port");
 
-    hostName = context.get("bind", String.class);
-    port = Integer.parseInt(context.get("port", String.class));
+    hostName = context.getString("bind");
+    port = Integer.parseInt(context.getString("port"));
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/TestContext.java Wed Feb 29 04:53:22 2012
@@ -19,10 +19,13 @@
 
 package org.apache.flume;
 
-import org.junit.Assert;
+import static org.junit.Assert.*;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.ImmutableMap;
+
 public class TestContext {
 
   private Context context;
@@ -34,12 +37,52 @@ public class TestContext {
 
   @Test
   public void testPutGet() {
-    Assert.assertEquals("Context is empty", 0, context.getParameters().size());
+    assertEquals("Context is empty", 0, context.getParameters().size());
+
+    context.put("test", "value");
+    assertEquals("value", context.getString("test"));
+    context.clear();
+    assertNull(context.getString("test"));
+    assertEquals("value", context.getString("test", "value"));
+
+    context.put("test", "true");
+    assertEquals(new Boolean(true), context.getBoolean("test"));
+    context.clear();
+    assertNull(context.getBoolean("test"));
+    assertEquals(new Boolean(true), context.getBoolean("test", true));
+
+    context.put("test", "1");
+    assertEquals(new Integer(1), context.getInteger("test"));
+    context.clear();
+    assertNull(context.getInteger("test"));
+    assertEquals(new Integer(1), context.getInteger("test", 1));
+
+    context.put("test", String.valueOf(Long.MAX_VALUE));
+    assertEquals(new Long(Long.MAX_VALUE), context.getLong("test"));
+    context.clear();
+    assertNull(context.getLong("test"));
+    assertEquals(new Long(Long.MAX_VALUE), context.getLong("test", Long.MAX_VALUE));
 
-    context.put("test", "test");
+  }
+
+  @Test
+  public void testSubProperties() {
+    context.put("my.key", "1");
+    context.put("otherKey", "otherValue");
+    assertEquals(ImmutableMap.of("key", "1"), context.getSubProperties("my."));
 
-    Assert.assertEquals("Context contains test value", "test",
-        context.get("test", String.class));
   }
 
+  @Test
+  public void testClear() {
+    context.put("test", "1");
+    context.clear();
+    assertNull(context.getInteger("test"));
+  }
+
+  @Test
+  public void testPutAll() {
+    context.putAll(ImmutableMap.of("test", "1"));
+    assertEquals("1", context.getString("test"));
+  }
 }

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java Wed Feb 29 04:53:22 2012
@@ -70,10 +70,10 @@ public class TestMemoryChannel {
   @Test
   public void testChannelResize() {
     Context context = new Context();
-    Map<String, Object> parms = new HashMap<String, Object>();
+    Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "5");
     parms.put("transactionCapacity", "5");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel,  context);
 
     Transaction transaction = channel.getTransaction();
@@ -105,7 +105,7 @@ public class TestMemoryChannel {
      * Reconfigure capacity down and add another event, shouldn't result in exception
      */
     parms.put("capacity", "6");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel, context);
     transaction = channel.getTransaction();
     transaction.begin();
@@ -119,7 +119,7 @@ public class TestMemoryChannel {
      */
     parms.put("capacity", "2");
     parms.put("transactionCapacity", "2");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel, context);
     for(int i=0; i < 6; i++) {
       transaction = channel.getTransaction();
@@ -133,10 +133,10 @@ public class TestMemoryChannel {
   @Test(expected=ChannelException.class)
   public void testTransactionPutCapacityOverload() {
     Context context = new Context();
-    Map<String, Object> parms = new HashMap<String, Object>();
+    Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "5");
     parms.put("transactionCapacity", "2");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel,  context);
 
     Transaction transaction = channel.getTransaction();
@@ -151,10 +151,10 @@ public class TestMemoryChannel {
   @Test(expected=ChannelException.class)
   public void testCapacityOverload() {
     Context context = new Context();
-    Map<String, Object> parms = new HashMap<String, Object>();
+    Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "5");
     parms.put("transactionCapacity", "3");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel,  context);
 
     Transaction transaction = channel.getTransaction();
@@ -178,10 +178,10 @@ public class TestMemoryChannel {
   @Test
   public void testBufferEmptyingAfterTakeCommit() {
     Context context = new Context();
-    Map<String, Object> parms = new HashMap<String, Object>();
+    Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "3");
     parms.put("transactionCapacity", "3");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel,  context);
 
     Transaction tx = channel.getTransaction();
@@ -210,10 +210,10 @@ public class TestMemoryChannel {
   @Test
   public void testBufferEmptyingAfterRollback() {
     Context context = new Context();
-    Map<String, Object> parms = new HashMap<String, Object>();
+    Map<String, String> parms = new HashMap<String, String>();
     parms.put("capacity", "3");
     parms.put("transactionCapacity", "3");
-    context.setParameters(parms);
+    context.putAll(parms);
     Configurables.configure(channel,  context);
 
     Transaction tx = channel.getTransaction();

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java Wed Feb 29 04:53:22 2012
@@ -153,13 +153,13 @@ public class TestFailoverSinkProcessor {
     sinks.add(s2);
     sinks.add(s3);
     SinkGroup group = new SinkGroup(sinks);
-    Map<String, Object> params = new HashMap<String, Object>();
+    Map<String, String> params = new HashMap<String, String>();
     params.put("sinks", "s1 s2 s3");
     params.put("processor.type", "failover");
     params.put("processor.priority.s1", "3");
     params.put("processor.priority.s2", "2");
     params.put("processor.priority.s3", "1");
-    context.setParameters(params);
+    context.putAll(params);
     Configurables.configure(group, context);
     SinkRunner runner = new SinkRunner(group.getProcessor());
     runner.start();

Modified: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-avro-source/src/main/java/org/apache/flume/source/avroLegacy/AvroLegacySource.java Wed Feb 29 04:53:22 2012
@@ -149,8 +149,8 @@ public class AvroLegacySource extends Ab
 
   @Override
   public void configure(Context context) {
-    port = Integer.parseInt(context.get("port", String.class));
-    host = context.get("host", String.class);
+    port = Integer.parseInt(context.getString("port"));
+    host = context.getString("host");
   }
 
 }

Modified: incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-legacy-sources/flume-thrift-source/src/main/java/org/apache/flume/source/thriftLegacy/ThriftLegacySource.java Wed Feb 29 04:53:22 2012
@@ -116,8 +116,8 @@ public class ThriftLegacySource  extends
 
   @Override
   public void configure(Context context) {
-    port = Integer.parseInt(context.get("port", String.class));
-    host = context.get("host", String.class);
+    port = Integer.parseInt(context.getString("port"));
+    host = context.getString("host");
   }
 
   public ThriftLegacySource() {

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java Wed Feb 29 04:53:22 2012
@@ -126,18 +126,18 @@ public class HDFSEventSink extends Abstr
   // read configuration and setup thresholds
   @Override
   public void configure(Context context) {
-    String dirpath = context.get("hdfs.path", String.class);
-    String fileName = context.get("hdfs.filePrefix", String.class);
-    String rollInterval = context.get("hdfs.rollInterval", String.class);
-    String rollSize = context.get("hdfs.rollSize", String.class);
-    String rollCount = context.get("hdfs.rollCount", String.class);
-    String batchSize = context.get("hdfs.batchSize", String.class);
-    String txnEventMax = context.get("hdfs.txnEventMax", String.class);
-    String codecName = context.get("hdfs.codeC", String.class);
-    String fileType = context.get("hdfs.fileType", String.class);
-    String maxOpenFiles = context.get("hdfs.maxOpenFiles", String.class);
-    String writeFormat = context.get("hdfs.writeFormat", String.class);
-    String appendTimeout = context.get("hdfs.appendTimeout", String.class);
+    String dirpath = context.getString("hdfs.path");
+    String fileName = context.getString("hdfs.filePrefix");
+    String rollInterval = context.getString("hdfs.rollInterval");
+    String rollSize = context.getString("hdfs.rollSize");
+    String rollCount = context.getString("hdfs.rollCount");
+    String batchSize = context.getString("hdfs.batchSize");
+    String txnEventMax = context.getString("hdfs.txnEventMax");
+    String codecName = context.getString("hdfs.codeC");
+    String fileType = context.getString("hdfs.fileType");
+    String maxOpenFiles = context.getString("hdfs.maxOpenFiles");
+    String writeFormat = context.getString("hdfs.writeFormat");
+    String appendTimeout = context.getString("hdfs.appendTimeout");
 
     if (fileName == null)
       fileName = defaultFileName;

Modified: incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java?rev=1294969&r1=1294968&r2=1294969&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java Wed Feb 29 04:53:22 2012
@@ -125,18 +125,18 @@ public class IRCSink extends AbstractSin
   }
 
   public void configure(Context context) {
-    hostname = context.get("hostname", String.class);
-    String portStr = context.get("port", String.class);
-    nick = context.get("nick", String.class);
-    password = context.get("password", String.class);
-    user = context.get("user", String.class);
-    name = context.get("name", String.class);
-    chan = context.get("chan", String.class);
-    splitLines = Boolean.parseBoolean(context.get("splitlines", String.class));
-    splitChars = context.get("splitchars", String.class);
-    
+    hostname = context.getString("hostname");
+    String portStr = context.getString("port");
+    nick = context.getString("nick");
+    password = context.getString("password");
+    user = context.getString("user");
+    name = context.getString("name");
+    chan = context.getString("chan");
+    splitLines = context.getBoolean("splitlines");
+    splitChars = context.getString("splitchars");
+
     if (portStr != null) {
-      port = Integer.parseInt(portStr);      
+      port = Integer.parseInt(portStr);
     } else {
       port = DEFAULT_PORT;
     }