You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by es...@apache.org on 2011/10/20 02:40:29 UTC

svn commit: r1186585 - in /incubator/flume/branches/flume-728: flume-ng-core/src/main/java/org/apache/flume/sink/ flume-ng-core/src/main/java/org/apache/flume/source/ flume-ng-node/src/main/java/org/apache/flume/conf/file/ flume-ng-node/src/main/java/o...

Author: esammer
Date: Thu Oct 20 00:40:28 2011
New Revision: 1186585

URL: http://svn.apache.org/viewvc?rev=1186585&view=rev
Log:
FLUME-802: Complete PropertyFileConfigurationProvider implementation

- The skeletal flume node now uses the property file based configuration provider.
- All Configurables now expect Strings and do proper parsing where necessary. This
  is because one does not yet rich types from java properties files, at least not
  the way we're using them.
- Reformatted a bunch of things to soothe my OCD (sorry Arvind).
- Added a bunch of accessors to get at some of the internals of FlumeConfiguration.
  We probably need to rename / clean a bunch of this stuff.

Modified:
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
    incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
    incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
    incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AbstractSink.java Thu Oct 20 00:40:28 2011
@@ -5,6 +5,8 @@ import org.apache.flume.Sink;
 import org.apache.flume.lifecycle.LifecycleAware;
 import org.apache.flume.lifecycle.LifecycleState;
 
+import com.google.common.base.Preconditions;
+
 abstract public class AbstractSink implements Sink, LifecycleAware {
 
   private Channel channel;
@@ -17,6 +19,8 @@ abstract public class AbstractSink imple
 
   @Override
   public synchronized void start() {
+    Preconditions.checkState(channel != null, "No channel configured");
+
     lifecycleState = LifecycleState.START;
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/sink/AvroSink.java Thu Oct 20 00:40:28 2011
@@ -113,8 +113,8 @@ public class AvroSink extends AbstractSi
   @Override
   public void configure(Context context) {
     hostname = context.get("hostname", String.class);
-    port = context.get("port", Integer.class);
-    batchSize = context.get("batch-size", Integer.class);
+    port = Integer.parseInt(context.get("port", String.class));
+    batchSize = Integer.parseInt(context.get("batch-size", String.class));
 
     if (batchSize == null) {
       batchSize = defaultBatchSize;

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AbstractSource.java Thu Oct 20 00:40:28 2011
@@ -4,6 +4,8 @@ import org.apache.flume.Channel;
 import org.apache.flume.Source;
 import org.apache.flume.lifecycle.LifecycleState;
 
+import com.google.common.base.Preconditions;
+
 abstract public class AbstractSource implements Source {
 
   private Channel channel;
@@ -16,6 +18,8 @@ abstract public class AbstractSource imp
 
   @Override
   public void start() {
+    Preconditions.checkState(channel != null, "No channel configured");
+
     lifecycleState = LifecycleState.START;
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/AvroSource.java Thu Oct 20 00:40:28 2011
@@ -44,7 +44,7 @@ public class AvroSource extends Abstract
 
   @Override
   public void configure(Context context) {
-    port = context.get("port", Integer.class);
+    port = Integer.parseInt(context.get("port", String.class));
     bindAddress = context.get("bind", String.class);
   }
 

Modified: incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-core/src/main/java/org/apache/flume/source/NetcatSource.java Thu Oct 20 00:40:28 2011
@@ -55,7 +55,7 @@ public class NetcatSource extends Abstra
   public void configure(Context context) {
     Configurables.ensureRequiredNonNull(context, "name", "port");
 
-    port = context.get("port", Integer.class);
+    port = Integer.parseInt(context.get("port", String.class));
   }
 
   @Override

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/file/AbstractFileConfigurationProvider.java Thu Oct 20 00:40:28 2011
@@ -35,8 +35,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
-public abstract class AbstractFileConfigurationProvider
-      implements ConfigurationProvider {
+public abstract class AbstractFileConfigurationProvider implements
+    ConfigurationProvider {
 
   private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -44,6 +44,7 @@ public abstract class AbstractFileConfig
   private ChannelFactory channelFactory;
   private SourceFactory sourceFactory;
   private SinkFactory sinkFactory;
+  private String nodeName;
   private NodeConfigurationAware configurationAware;
 
   private LifecycleState lifecycleState;
@@ -58,7 +59,7 @@ public abstract class AbstractFileConfig
   @Override
   public String toString() {
     return "{ file:" + file + " counterGroup:" + counterGroup + "  provider:"
-            + getClass().getCanonicalName() + " }";
+        + getClass().getCanonicalName() + " nodeName:" + nodeName + " }";
   }
 
   @Override
@@ -111,6 +112,15 @@ public abstract class AbstractFileConfig
 
   // Synchronized wrapper to call the load function
   private synchronized void doLoad() {
+    Preconditions
+        .checkState(nodeName != null,
+            "No node name specified - Unable to determine what part of the config to load");
+    Preconditions.checkState(channelFactory != null,
+        "No channel factory configured");
+    Preconditions.checkState(sourceFactory != null,
+        "No source factory configured");
+    Preconditions.checkState(sinkFactory != null, "No sink factory configured");
+
     load();
   }
 
@@ -159,6 +169,14 @@ public abstract class AbstractFileConfig
     this.configurationAware = configurationAware;
   }
 
+  public String getNodeName() {
+    return nodeName;
+  }
+
+  public void setNodeName(String nodeName) {
+    this.nodeName = nodeName;
+  }
+
   public class FileWatcherRunnable implements Runnable {
 
     private File file;

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/FlumeConfiguration.java Thu Oct 20 00:40:28 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.flume.conf.properties;
 
+import java.util.Collection;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -35,20 +36,20 @@ import org.slf4j.LoggerFactory;
  * configuration namespace required by the PropertiesFileConfgurationProvider.
  * This class is instantiated with a properties object which is parsed to
  * construct the hierarchy in memory. Once the entire set of properties have
- * been parsed and populated, a validation routine is run that identifies
- * and removes invalid components.
+ * been parsed and populated, a validation routine is run that identifies and
+ * removes invalid components.
  * </p>
- *
+ * 
  * @see org.apache.flume.conf.properties.PropertiesFileConfigurationProvider
- *
+ * 
  */
 public class FlumeConfiguration {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(FlumeConfiguration.class);
+  private static final Logger logger = LoggerFactory
+      .getLogger(FlumeConfiguration.class);
 
-  private static final String NEWLINE =
-      System.getProperty("line.separator", "\n");
+  private static final String NEWLINE = System.getProperty("line.separator",
+      "\n");
   private static final String INDENTSTEP = "  ";
 
   private static final String SOURCES = "sources";
@@ -76,13 +77,13 @@ public class FlumeConfiguration {
 
     // Construct the in-memory component hierarchy
     Enumeration<?> propertyNames = properties.propertyNames();
+
     while (propertyNames.hasMoreElements()) {
       String name = (String) propertyNames.nextElement();
       String value = properties.getProperty(name);
 
       if (!addRawProperty(name, value)) {
-        LOGGER.warn("Configuration property ignored: "
-            + name + " = " + value);
+        logger.warn("Configuration property ignored: " + name + " = " + value);
       }
     }
 
@@ -90,19 +91,26 @@ public class FlumeConfiguration {
     validateConfiguration();
   }
 
+  public AgentConfiguration getConfigurationFor(String hostname) {
+    return agentConfigMap.get(hostname);
+  }
+
   private void validateConfiguration() {
     Iterator<String> it = agentConfigMap.keySet().iterator();
+
     while (it.hasNext()) {
       String agentName = it.next();
       AgentConfiguration aconf = agentConfigMap.get(agentName);
+
       if (!aconf.isValid()) {
-        LOGGER.warn("Agent configuration invalid for agent '"
-                  + agentName + "'. It will be removed.");
+        logger.warn("Agent configuration invalid for agent '" + agentName
+            + "'. It will be removed.");
+
         it.remove();
       }
     }
 
-    LOGGER.info("Post-validation flume configuration contains configuation "
+    logger.info("Post-validation flume configuration contains configuation "
         + " for agents: " + agentConfigMap.keySet());
   }
 
@@ -135,7 +143,7 @@ public class FlumeConfiguration {
       return false;
     }
 
-    String configKey = name.substring(index+1);
+    String configKey = name.substring(index + 1);
 
     // Configuration key must be specified for every property
     if (configKey.length() == 0) {
@@ -143,17 +151,17 @@ public class FlumeConfiguration {
     }
 
     AgentConfiguration aconf = agentConfigMap.get(agentName);
+
     if (aconf == null) {
       aconf = new AgentConfiguration(agentName);
       agentConfigMap.put(agentName, aconf);
     }
+
     // Each configuration key must begin with one of the three prefixes:
     // sources, sinks, or channels.
-
     return aconf.addProperty(configKey, value);
   }
 
-
   public static class AgentConfiguration {
 
     private final String agentName;
@@ -165,14 +173,26 @@ public class FlumeConfiguration {
     private final Map<String, ComponentConfiguration> sinkConfigMap;
     private final Map<String, ComponentConfiguration> channelConfigMap;
 
-
     private AgentConfiguration(String agentName) {
       this.agentName = agentName;
+
       sourceConfigMap = new HashMap<String, ComponentConfiguration>();
       sinkConfigMap = new HashMap<String, ComponentConfiguration>();
       channelConfigMap = new HashMap<String, ComponentConfiguration>();
     }
 
+    public Collection<ComponentConfiguration> getChannels() {
+      return channelConfigMap.values();
+    }
+
+    public Collection<ComponentConfiguration> getSources() {
+      return sourceConfigMap.values();
+    }
+
+    public Collection<ComponentConfiguration> getSinks() {
+      return sinkConfigMap.values();
+    }
+
     /**
      * <p>
      * Checks the validity of the agent configuration. This method assumes that
@@ -185,21 +205,23 @@ public class FlumeConfiguration {
      * components are not available, the configuration itself will be considered
      * invalid.
      * </p>
+     * 
      * @return true if the configuration is valid, false otherwise
      */
     private boolean isValid() {
-      LOGGER.debug("Starting validation of configuration for agent: "
+      logger.debug("Starting validation of configuration for agent: "
           + agentName + ", initial-configuration: " + this);
 
       // Make sure that at least one channel is specified
       if (channels == null || channels.trim().length() == 0) {
-        LOGGER.warn("Agent configuration for '" + agentName
+        logger.warn("Agent configuration for '" + agentName
             + "' does not contain any channels. Marking it as invalid.");
         return false;
       }
 
       Set<String> channelSet = new HashSet<String>();
       StringTokenizer channelTok = new StringTokenizer(channels, " \t");
+
       while (channelTok.hasMoreTokens()) {
         channelSet.add(channelTok.nextToken());
       }
@@ -207,7 +229,7 @@ public class FlumeConfiguration {
       validateComponent(channelSet, channelConfigMap, CLASS_CHANNEL, ATTR_TYPE);
 
       if (channelSet.size() == 0) {
-        LOGGER.warn("Agent configuration for '" + agentName
+        logger.warn("Agent configuration for '" + agentName
             + "' does not contain any valid channels. Marking it as invalid.");
 
         return false;
@@ -219,25 +241,30 @@ public class FlumeConfiguration {
 
       if (sources != null && sources.trim().length() > 0) {
         StringTokenizer sourceTok = new StringTokenizer(sources, " \t");
+
         while (sourceTok.hasMoreTokens()) {
           sourceSet.add(sourceTok.nextToken());
         }
 
         // Filter out any sources that have invalid channels
         Iterator<String> srcIt = sourceConfigMap.keySet().iterator();
+
         while (srcIt.hasNext()) {
           String nextSource = srcIt.next();
           ComponentConfiguration sourceConfig = sourceConfigMap.get(nextSource);
           Set<String> srcChannelSet = new HashSet<String>();
+
           if (sourceConfig.hasAttribute(ATTR_CHANNELS)) {
             String srcChannels = sourceConfig.getAttribute(ATTR_CHANNELS);
             StringTokenizer srcChTok = new StringTokenizer(srcChannels, " \t");
+
             while (srcChTok.hasMoreTokens()) {
               String nextSrcCh = srcChTok.nextToken();
+
               if (channelSet.contains(nextSrcCh)) {
                 srcChannelSet.add(nextSrcCh);
               } else {
-                LOGGER.warn("Agent configuration for '" + agentName
+                logger.warn("Agent configuration for '" + agentName
                     + "' source '" + sourceConfig.getComponentName()
                     + "' contains invalid channel: '" + nextSrcCh
                     + "'. Will be removed.");
@@ -246,8 +273,8 @@ public class FlumeConfiguration {
           }
 
           if (srcChannelSet.size() == 0) {
-            LOGGER.warn("Agent configuration for '" + agentName
-                + "' source '" + sourceConfig.getComponentName()
+            logger.warn("Agent configuration for '" + agentName + "' source '"
+                + sourceConfig.getComponentName()
                 + "' has no valid channels. Removing.");
 
             srcIt.remove();
@@ -256,44 +283,50 @@ public class FlumeConfiguration {
 
           // Override the source configuration to reset channels
           StringBuilder validSrcChannelBuilder = new StringBuilder("");
+
           for (String validSrcCh : srcChannelSet) {
             validSrcChannelBuilder.append(" ").append(validSrcCh);
           }
 
-          sourceConfig.setAttribute(ATTR_CHANNELS,
-              validSrcChannelBuilder.toString().trim());
+          sourceConfig.setAttribute(ATTR_CHANNELS, validSrcChannelBuilder
+              .toString().trim());
         }
       }
 
-      validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE,
-          ATTR_TYPE, ATTR_CHANNELS);
+      validateComponent(sourceSet, sourceConfigMap, CLASS_SOURCE, ATTR_TYPE,
+          ATTR_CHANNELS);
 
       Set<String> sinkSet = new HashSet<String>();
+
       if (sinks != null && sinks.trim().length() > 0) {
         StringTokenizer sinkTok = new StringTokenizer(sinks, " \t");
+
         while (sinkTok.hasMoreTokens()) {
           sinkSet.add(sinkTok.nextToken());
         }
 
         // Filter out any sinks that have invalid channel
         Iterator<String> sinkIt = sinkConfigMap.keySet().iterator();
+
         while (sinkIt.hasNext()) {
           String nextSink = sinkIt.next();
           ComponentConfiguration sinkConfig = sinkConfigMap.get(nextSink);
+
           if (sinkConfig.hasAttribute(ATTR_CHANNEL)) {
             String sinkCh = sinkConfig.getAttribute(ATTR_CHANNEL);
+
             if (!channelSet.contains(sinkCh)) {
-              LOGGER.warn("Agent configuration for '" + agentName
-                  + "' sink '" + sinkConfig.getComponentName()
-                  + "' has invalid channel '" + sinkCh
-                  + "' specified. Removing.");
+              logger.warn("Agent configuration for '" + agentName + "' sink '"
+                  + sinkConfig.getComponentName() + "' has invalid channel '"
+                  + sinkCh + "' specified. Removing.");
               sinkIt.remove();
               continue;
             }
           } else {
-            LOGGER.warn("Agent configuration for '" + agentName
-                  + "' sink '" + sinkConfig.getComponentName()
-                  + "' has no channels. Removing.");
+            logger.warn("Agent configuration for '" + agentName + "' sink '"
+                + sinkConfig.getComponentName()
+                + "' has no channels. Removing.");
+
             sinkIt.remove();
             continue;
           }
@@ -305,7 +338,7 @@ public class FlumeConfiguration {
 
       // If no sources or sinks are present, then this is invalid
       if (sourceSet.size() == 0 && sinkSet.size() == 0) {
-        LOGGER.warn("Agent configuration for '" + agentName
+        logger.warn("Agent configuration for '" + agentName
             + "' has no sources or sinks. Will be marked invalid.");
         return false;
       }
@@ -323,7 +356,9 @@ public class FlumeConfiguration {
       if (entries.size() == 0) {
         return null;
       }
+
       StringBuilder sb = new StringBuilder("");
+
       for (String entry : entries) {
         sb.append(" ").append(entry);
       }
@@ -332,30 +367,37 @@ public class FlumeConfiguration {
     }
 
     /**
-     * <p>Utility method to iterate over the component configuration to
-     * validate them based on the criteria as follows:
+     * <p>
+     * Utility method to iterate over the component configuration to validate
+     * them based on the criteria as follows:
      * <ol>
-     * <li>Each component in the configuredMap must be present in the
-     * given activeSet</li>
-     * <li>Each component in activeSet must be configured in the
-     * configuredMap</li>
+     * <li>Each component in the configuredMap must be present in the given
+     * activeSet</li>
+     * <li>Each component in activeSet must be configured in the configuredMap</li>
      * <li>Each component must have requiredAttributes set correctly.</li>
      * </ol>
-     * @param activeSet the active set of components
-     * @param configuredMap the components picked from configuration
-     * @param componentClass the component class - source, sink, etc
-     * @param requiredAttributes the required attributes for the component
+     * 
+     * @param activeSet
+     *          the active set of components
+     * @param configuredMap
+     *          the components picked from configuration
+     * @param componentClass
+     *          the component class - source, sink, etc
+     * @param requiredAttributes
+     *          the required attributes for the component
      */
     private void validateComponent(Set<String> activeSet,
         Map<String, ComponentConfiguration> configuredMap,
         String componentClass, String... requiredAttributes) {
 
       Iterator<String> it = configuredMap.keySet().iterator();
+
       while (it.hasNext()) {
         String componentName = it.next();
+
         if (!activeSet.contains(componentName)) {
-          LOGGER.warn("Agent configuration for '" + agentName
-              + "': " + componentClass + " '" + componentName
+          logger.warn("Agent configuration for '" + agentName + "': "
+              + componentClass + " '" + componentName
               + "' not in active list. Removing.");
 
           it.remove();
@@ -365,19 +407,21 @@ public class FlumeConfiguration {
         // Every component must have a required attribute
         ComponentConfiguration config = configuredMap.get(componentName);
         boolean missingRequiredAttributes = false;
+
         for (String attrName : requiredAttributes) {
           if (!config.hasAttribute(attrName)) {
-            LOGGER.warn("Agent configuration for '" + agentName
-                + "': " + componentClass + " '" + componentName
-                + "' does not have '" + attrName + "' specified.");
+            logger.warn("Agent configuration for '" + agentName + "': "
+                + componentClass + " '" + componentName + "' does not have '"
+                + attrName + "' specified.");
             missingRequiredAttributes = true;
           }
         }
 
         if (missingRequiredAttributes) {
-          LOGGER.warn("Agent configuration for '" + agentName
-                + "': " + componentClass + " '" + componentName
-                + "' has some required attributes missing. Removing.");
+          logger.warn("Agent configuration for '" + agentName + "': "
+              + componentClass + " '" + componentName
+              + "' has some required attributes missing. Removing.");
+
           it.remove();
           continue;
         }
@@ -385,18 +429,20 @@ public class FlumeConfiguration {
 
       // Remove the active channels that are not configured
       Iterator<String> activeIt = activeSet.iterator();
+
       while (activeIt.hasNext()) {
         String componentName = activeIt.next();
+
         if (!configuredMap.containsKey(componentName)) {
-          LOGGER.warn("Agent configuration for '" + agentName
-              + "': " + componentClass + " '" + componentName
+          logger.warn("Agent configuration for '" + agentName + "': "
+              + componentClass + " '" + componentName
               + "' is not configured. Removing.");
+
           activeIt.remove();
         }
       }
     }
 
-
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder("AgentConfiguration[");
@@ -404,6 +450,7 @@ public class FlumeConfiguration {
       sb.append(sourceConfigMap).append(NEWLINE).append("CHANNELS: ");
       sb.append(channelConfigMap).append(NEWLINE).append("SINKS: ");
       sb.append(sinkConfigMap);
+
       return sb.toString();
     }
 
@@ -414,8 +461,8 @@ public class FlumeConfiguration {
           sources = value;
           return true;
         } else {
-          LOGGER.warn("Duplicate source list specified for agent: "
-              + agentName);
+          logger
+              .warn("Duplicate source list specified for agent: " + agentName);
           return false;
         }
       }
@@ -426,25 +473,27 @@ public class FlumeConfiguration {
           sinks = value;
           return true;
         } else {
-          LOGGER.warn("Duplicate sink list specfied for agent: "
-              + agentName);
+          logger.warn("Duplicate sink list specfied for agent: " + agentName);
           return false;
         }
       }
 
       // Check for channels
       if (key.equals(CHANNELS)) {
-        if(channels == null) {
+        if (channels == null) {
           channels = value;
+
           return true;
         } else {
-          LOGGER.warn("Duplicate channel list specified for agent: "
+          logger.warn("Duplicate channel list specified for agent: "
               + agentName);
+
           return false;
         }
       }
 
       ComponentNameAndConfigKey cnck = parseConfigKey(key, SOURCES_PREFIX);
+
       if (cnck != null) {
         // it is a source
         String name = cnck.getComponentName();
@@ -459,6 +508,7 @@ public class FlumeConfiguration {
       }
 
       cnck = parseConfigKey(key, CHANNELS_PREFIX);
+
       if (cnck != null) {
         // it is a channel
         String name = cnck.getComponentName();
@@ -473,6 +523,7 @@ public class FlumeConfiguration {
       }
 
       cnck = parseConfigKey(key, SINKS_PREFIX);
+
       if (cnck != null) {
         // it is a sink
         String name = cnck.getComponentName();
@@ -486,12 +537,11 @@ public class FlumeConfiguration {
         return sinkConf.addProperty(cnck.getConfigKey(), value);
       }
 
-      LOGGER.warn("Invalid property specified: " + key);
+      logger.warn("Invalid property specified: " + key);
       return false;
     }
 
-    private ComponentNameAndConfigKey parseConfigKey(
-        String key, String prefix) {
+    private ComponentNameAndConfigKey parseConfigKey(String key, String prefix) {
       // key must start with prefix
       if (!key.startsWith(prefix)) {
         return null;
@@ -500,6 +550,7 @@ public class FlumeConfiguration {
       // key must have a component name part after the prefix of the format:
       // <prefix><component-name>.<config-key>
       int index = key.indexOf('.', prefix.length() + 1);
+
       if (index == -1) {
         return null;
       }
@@ -516,94 +567,108 @@ public class FlumeConfiguration {
     }
   }
 
-
   public static class ComponentConfiguration {
-     private final String componentName;
-     private final boolean hasRunner;
-     private final ComponentConfiguration runnerConfig;
-
-     private final Map<String, String> configuration;
-
-     private ComponentConfiguration(String componentName, boolean hasRunner) {
-       this.componentName = componentName;
-       this.hasRunner = hasRunner;
-       if (hasRunner) {
-         runnerConfig = new ComponentConfiguration(RUNNER, false);
-       } else {
-         runnerConfig = null;
-       }
-       this.configuration = new HashMap<String, String>();
-     }
-
-     @Override
-     public String toString() {
-       return toString(0);
-     }
-
-     public String getComponentName() {
-       return componentName;
-     }
-
-     private boolean hasAttribute(String attributeName) {
-       return configuration.containsKey(attributeName);
-     }
-
-     private String getAttribute(String attriubteName) {
-       return configuration.get(attriubteName);
-     }
-
-     private void setAttribute(String attributeName, String value) {
-       configuration.put(attributeName, value);
-     }
-
-     private String toString(int indentCount) {
-       StringBuilder indentSb = new StringBuilder("");
-       for (int i = 0; i<indentCount; i++) {
-         indentSb.append(INDENTSTEP);
-       }
-       String indent = indentSb.toString();
-       StringBuilder sb = new StringBuilder(indent);
-       sb.append("ComponentConfiguration[").append(componentName).append("]");
-       sb.append(NEWLINE).append(indent).append(INDENTSTEP).append("CONFIG: ");
-       sb.append(configuration);
-       sb.append(NEWLINE).append(indent).append(INDENTSTEP);
-       if (hasRunner) {
-         sb.append("RUNNER: ").append(runnerConfig.toString(indentCount+1));
-       }
-       sb.append(NEWLINE);
-       return sb.toString();
-     }
-
-     private boolean addProperty(String key, String value) {
-       // see if the key belongs to the runner
-       if (hasRunner && key.startsWith(RUNNER_PREFIX)) {
-         String subKey = key.substring(RUNNER_PREFIX.length());
-         if (subKey.length() == 0) {
-           LOGGER.warn("Invalid key specified: " + key);
-           return false;
-         }
-         return runnerConfig.addProperty(subKey, value);
-       }
-
-       // do not allow properties of the name "runner"
-       if (hasRunner && key.equals(RUNNER)) {
-         LOGGER.warn("Cannot have property named: "
-                 + key + " for component: " + componentName);
-         return false;
-       }
-
-       if (!configuration.containsKey(key)) {
-         configuration.put(key, value);
-         return true;
-       }
-
-       LOGGER.warn("Duplicate property '" + key + "' specified for " +
-               componentName);
-       return false;
-     }
+
+    private final String componentName;
+    private final boolean hasRunner;
+    private final ComponentConfiguration runnerConfig;
+
+    private final Map<String, String> configuration;
+
+    private ComponentConfiguration(String componentName, boolean hasRunner) {
+      this.componentName = componentName;
+      this.hasRunner = hasRunner;
+
+      if (hasRunner) {
+        runnerConfig = new ComponentConfiguration(RUNNER, false);
+      } else {
+        runnerConfig = null;
+      }
+
+      this.configuration = new HashMap<String, String>();
+    }
+
+    @Override
+    public String toString() {
+      return toString(0);
+    }
+
+    public String getComponentName() {
+      return componentName;
+    }
+
+    private boolean hasAttribute(String attributeName) {
+      return configuration.containsKey(attributeName);
+    }
+
+    private String getAttribute(String attriubteName) {
+      return configuration.get(attriubteName);
+    }
+
+    private void setAttribute(String attributeName, String value) {
+      configuration.put(attributeName, value);
+    }
+
+    private String toString(int indentCount) {
+      StringBuilder indentSb = new StringBuilder("");
+
+      for (int i = 0; i < indentCount; i++) {
+        indentSb.append(INDENTSTEP);
+      }
+
+      String indent = indentSb.toString();
+      StringBuilder sb = new StringBuilder(indent);
+
+      sb.append("ComponentConfiguration[").append(componentName).append("]");
+      sb.append(NEWLINE).append(indent).append(INDENTSTEP).append("CONFIG: ");
+      sb.append(configuration);
+      sb.append(NEWLINE).append(indent).append(INDENTSTEP);
+
+      if (hasRunner) {
+        sb.append("RUNNER: ").append(runnerConfig.toString(indentCount + 1));
+      }
+
+      sb.append(NEWLINE);
+
+      return sb.toString();
+    }
+
+    private boolean addProperty(String key, String value) {
+      // see if the key belongs to the runner
+      if (hasRunner && key.startsWith(RUNNER_PREFIX)) {
+        String subKey = key.substring(RUNNER_PREFIX.length());
+        if (subKey.length() == 0) {
+          logger.warn("Invalid key specified: " + key);
+          return false;
+        }
+        return runnerConfig.addProperty(subKey, value);
+      }
+
+      // do not allow properties of the name "runner"
+      if (hasRunner && key.equals(RUNNER)) {
+        logger.warn("Cannot have property named: " + key + " for component: "
+            + componentName);
+        return false;
+      }
+
+      if (!configuration.containsKey(key)) {
+        configuration.put(key, value);
+        return true;
+      }
+
+      logger.warn("Duplicate property '" + key + "' specified for "
+          + componentName);
+      return false;
+    }
+
+    public Map<String, String> getConfiguration() {
+      return configuration;
+    }
+
   }
 
   public static class ComponentNameAndConfigKey {
+
     private final String componentName;
     private final String configKey;
 

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/conf/properties/PropertiesFileConfigurationProvider.java Thu Oct 20 00:40:28 2011
@@ -21,29 +21,35 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.StringTokenizer;
 
+import org.apache.flume.Channel;
+import org.apache.flume.Context;
+import org.apache.flume.Sink;
+import org.apache.flume.SinkRunner;
+import org.apache.flume.Source;
+import org.apache.flume.SourceRunner;
+import org.apache.flume.conf.Configurables;
 import org.apache.flume.conf.file.AbstractFileConfigurationProvider;
 import org.apache.flume.conf.file.SimpleNodeConfiguration;
+import org.apache.flume.conf.properties.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.conf.properties.FlumeConfiguration.ComponentConfiguration;
+import org.apache.flume.node.NodeConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * <p>
  * A configuration provider that uses properties file for specifying
- * configuration. The configuration files follow the Java properties file
- * syntax rules specified at {@link java.util.Properties#load(java.io.Reader)}.
- * Every configuration value specified in the properties file is prefixed by
- * an <em>Agent Name</em> which helps isolate an individual agent&apos;s
- * namespace.
+ * configuration. The configuration files follow the Java properties file syntax
+ * rules specified at {@link java.util.Properties#load(java.io.Reader)}. Every
+ * configuration value specified in the properties file is prefixed by an
+ * <em>Agent Name</em> which helps isolate an individual agent&apos;s namespace.
  * </p>
  * <p>
- * Valid configuration files must observe the following rules for every
- * agent namespace.
+ * Valid configuration files must observe the following rules for every agent
+ * namespace.
  * <ul>
  * <li>For every &lt;agent name&gt; there must be three lists specified that
  * include <tt>&lt;agent name&gt;.sources</tt>,
@@ -51,8 +57,8 @@ import org.slf4j.LoggerFactory;
  * Each of these lists must contain a space separated list of names
  * corresponding to that particular entity.</li>
  * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
- * be a non-empty <tt>type</tt> attribute specified from the valid set of
- * source types. For example:
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of source
+ * types. For example:
  * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.type = event</tt></li>
  * <li>For each source named in <tt>&lt;agent name&gt;.sources</tt>, there must
  * be a space-separated list of channel names that the source will associate
@@ -66,15 +72,14 @@ import org.slf4j.LoggerFactory;
  * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.type = avro</tt>.
  * This namespace can also be used to configure other configuration of the
  * source runner as needed. For example:
- * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt>
- * </li>
+ * <tt>&lt;agent name&gt;.sources.&lt;source name&gt;.runner.port = 10101</tt></li>
  * <li>For each channel named in the <tt>&lt;agent name&gt;.channels</tt>, there
  * must be a non-empty <tt>type</tt> attribute specified from the valid set of
  * channel types. For example:
  * <tt>&lt;agent name&gt;.channels.&lt;channel name&gt;.type = mem</tt></li>
  * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
- * be a non-empty <tt>type</tt> attribute specified from the valid set of
- * sink types. For example:
+ * be a non-empty <tt>type</tt> attribute specified from the valid set of sink
+ * types. For example:
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.type = hdfs</tt></li>
  * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
  * be a non-empty single-valued channel name specified as the value of the
@@ -82,16 +87,16 @@ import org.slf4j.LoggerFactory;
  * specified by <tt>&lt;agent name&gt;.channels</tt>. For example:
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.channel =
  * &lt;channel name&gt;</tt></li>
- * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there
- * must be a <tt>runner</tt> namespace of configuration that configures the
+ * <li>For each sink named in the <tt>&lt;agent name&gt;.sinks</tt>, there must
+ * be a <tt>runner</tt> namespace of configuration that configures the
  * associated sink runner. For example:
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.type = polling</tt>.
- * This namespace can also be used to configure other configuration of the
- * sink runner as needed. For example:
+ * This namespace can also be used to configure other configuration of the sink
+ * runner as needed. For example:
  * <tt>&lt;agent name&gt;.sinks.&lt;sink name&gt;.runner.polling.interval =
  * 60</tt></li>
  * </ul>
- *
+ * 
  * Apart from the above required configuration values, each source, sink or
  * channel can have its own set of arbitrary configuration as required by the
  * implementation. Each of these configuration values are expressed by fully
@@ -103,46 +108,46 @@ import org.slf4j.LoggerFactory;
  * <p>
  * Any information contained in the configuration file other than what pertains
  * to the configured agents, sources, sinks and channels via the explicitly
- * enumerated list of sources, sinks and channels per agent name are ignored
- * by this provider. Moreover, if any of the required configuration values are
- * not present in the configuration file for the configured entities, that
- * entity and anything that depends upon it is considered invalid and
- * consequently not configured. For example, if a channel is missing its
- * <tt>type</tt> attribute, it is considered misconfigured. Also, any sources
- * or sinks that depend upon this channel are also considered misconfigured and
- * not initialized.
+ * enumerated list of sources, sinks and channels per agent name are ignored by
+ * this provider. Moreover, if any of the required configuration values are not
+ * present in the configuration file for the configured entities, that entity
+ * and anything that depends upon it is considered invalid and consequently not
+ * configured. For example, if a channel is missing its <tt>type</tt> attribute,
+ * it is considered misconfigured. Also, any sources or sinks that depend upon
+ * this channel are also considered misconfigured and not initialized.
  * </p>
  * <p>
  * Example configuration file:
+ * 
  * <pre>
  * #
  * # Flume Configuration
  * # This file contains configuration for one Agent identified as host1.
  * #
- *
+ * 
  * host1.sources = avroSource thriftSource
  * host1.channel = jdbcChannel
  * host1.sinks = hdfsSink
- *
+ * 
  * # avroSource configuration
  * host1.sources.avroSource.type = org.apache.flume.source.AvroSource
  * host1.sources.avroSource.runner.type = avro
  * host1.sources.avroSource.runner.port = 11001
  * host1.sources.avroSource.channels = jdbcChannel
- *
+ * 
  * # thriftSource configuration
  * host1.sources.thriftSource.type = org.apache.flume.source.ThriftSource
  * host1.sources.thriftSource.runner.type = thrift
  * host1.sources.thriftSource.runner.port = 12001
  * host1.sources.thriftSource.channels = jdbcChannel
- *
+ * 
  * # jdbcChannel configuration
  * host1.channels.jdbcChannel.type = jdbc
  * host1.channels.jdbcChannel.jdbc.driver = com.mysql.jdbc.Driver
  * host1.channels.jdbcChannel.jdbc.connect.url = http://localhost/flumedb
  * host1.channels.jdbcChannel.jdbc.username = flume
  * host1.channels.jdbcChannel.jdbc.password = flume
- *
+ * 
  * # hdfsSink configuration
  * host1.sinks.hdfsSink.type = hdfs
  * host1.sinks.hdfsSink.namenode = hdfs://localhost/
@@ -150,39 +155,116 @@ import org.slf4j.LoggerFactory;
  * host1.sinks.hdfsSink.runner.type = polling
  * host1.sinks.hdfsSink.runner.polling.interval = 60
  * </pre>
+ * 
  * </p>
+ * 
  * @see java.util.Properties#load(java.io.Reader)
  */
-public class PropertiesFileConfigurationProvider
-    extends AbstractFileConfigurationProvider {
+public class PropertiesFileConfigurationProvider extends
+    AbstractFileConfigurationProvider {
 
-  private static final Logger LOGGER =
-      LoggerFactory.getLogger(PropertiesFileConfigurationProvider.class);
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PropertiesFileConfigurationProvider.class);
 
   @Override
   protected void load() {
-    SimpleNodeConfiguration configuration = new SimpleNodeConfiguration();
-
     File propertiesFile = getFile();
     BufferedReader reader = null;
+
     try {
       reader = new BufferedReader(new FileReader(propertiesFile));
       Properties properties = new Properties();
       properties.load(reader);
 
+      NodeConfiguration conf = new SimpleNodeConfiguration();
       FlumeConfiguration fconfig = new FlumeConfiguration(properties);
+      AgentConfiguration agentConf = fconfig.getConfigurationFor(getNodeName());
 
+      if (agentConf != null) {
+        loadChannels(agentConf, conf);
+        loadSources(agentConf, conf);
+        loadSinks(agentConf, conf);
+
+        getConfigurationAware().onNodeConfigurationChanged(conf);
+      } else {
+        LOGGER.warn("No configuration found for this host:{}", getNodeName());
+      }
     } catch (IOException ex) {
       LOGGER.error("Unable to load file: " + propertiesFile, ex);
+    } catch (InstantiationException ex) {
+      LOGGER.error("Unable to load file:{}", propertiesFile, ex);
     } finally {
       if (reader != null) {
         try {
           reader.close();
         } catch (IOException ex) {
-          LOGGER.warn("Unable to close file reader for file: "
-              + propertiesFile, ex);
+          LOGGER.warn(
+              "Unable to close file reader for file: " + propertiesFile, ex);
         }
       }
     }
   }
+
+  private void loadChannels(AgentConfiguration agentConf, NodeConfiguration conf)
+      throws InstantiationException {
+
+    for (ComponentConfiguration comp : agentConf.getChannels()) {
+      Context context = new Context();
+
+      Channel channel = getChannelFactory().create(
+          comp.getConfiguration().get("type"));
+
+      for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
+        context.put(entry.getKey(), entry.getValue());
+      }
+
+      Configurables.configure(channel, context);
+
+      conf.getChannels().put(comp.getComponentName(), channel);
+    }
+  }
+
+  private void loadSources(AgentConfiguration agentConf, NodeConfiguration conf)
+      throws InstantiationException {
+
+    for (ComponentConfiguration comp : agentConf.getSources()) {
+      Context context = new Context();
+
+      Source source = getSourceFactory().create(
+          comp.getConfiguration().get("type"));
+
+      for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
+        context.put(entry.getKey(), entry.getValue());
+      }
+
+      Configurables.configure(source, context);
+
+      source.setChannel(conf.getChannels().get(
+          comp.getConfiguration().get("channels")));
+      conf.getSourceRunners().put(comp.getComponentName(),
+          SourceRunner.forSource(source));
+    }
+  }
+
+  private void loadSinks(AgentConfiguration agentConf, NodeConfiguration conf)
+      throws InstantiationException {
+
+    for (ComponentConfiguration comp : agentConf.getSinks()) {
+      Context context = new Context();
+
+      Sink sink = getSinkFactory().create(comp.getConfiguration().get("type"));
+
+      for (Entry<String, String> entry : comp.getConfiguration().entrySet()) {
+        context.put(entry.getKey(), entry.getValue());
+      }
+
+      Configurables.configure(sink, context);
+
+      sink.setChannel(conf.getChannels().get(
+          comp.getConfiguration().get("channel")));
+      conf.getSinkRunners().put(comp.getComponentName(),
+          SinkRunner.forSink(sink));
+    }
+  }
+
 }

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/main/java/org/apache/flume/node/Application.java Thu Oct 20 00:40:28 2011
@@ -14,7 +14,8 @@ import org.apache.flume.SinkFactory;
 import org.apache.flume.SourceFactory;
 import org.apache.flume.channel.DefaultChannelFactory;
 import org.apache.flume.channel.MemoryChannel;
-import org.apache.flume.conf.file.JsonFileConfigurationProvider;
+import org.apache.flume.conf.file.AbstractFileConfigurationProvider;
+import org.apache.flume.conf.properties.PropertiesFileConfigurationProvider;
 import org.apache.flume.lifecycle.LifecycleController;
 import org.apache.flume.lifecycle.LifecycleException;
 import org.apache.flume.lifecycle.LifecycleState;
@@ -125,19 +126,20 @@ public class Application {
 
     final FlumeNode node = new FlumeNode();
     DefaultLogicalNodeManager nodeManager = new DefaultLogicalNodeManager();
-    JsonFileConfigurationProvider configurationProvider = new JsonFileConfigurationProvider();
+    AbstractFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider();
 
     configurationProvider.setChannelFactory(channelFactory);
     configurationProvider.setSourceFactory(sourceFactory);
     configurationProvider.setSinkFactory(sinkFactory);
 
+    configurationProvider.setNodeName(nodeName);
+    configurationProvider.setConfigurationAware(nodeManager);
+    configurationProvider.setFile(configurationFile);
+
     Preconditions.checkState(configurationFile != null,
         "Configuration file not specified");
     Preconditions.checkState(nodeName != null, "Node name not specified");
 
-    configurationProvider.setConfigurationAware(nodeManager);
-    configurationProvider.setFile(configurationFile);
-
     node.setName(nodeName);
     node.setNodeManager(nodeManager);
     node.setConfigurationProvider(configurationProvider);

Modified: incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java
URL: http://svn.apache.org/viewvc/incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java?rev=1186585&r1=1186584&r2=1186585&view=diff
==============================================================================
--- incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java (original)
+++ incubator/flume/branches/flume-728/flume-ng-node/src/test/java/org/apache/flume/conf/properties/TestPropertiesFileConfigurationProvider.java Thu Oct 20 00:40:28 2011
@@ -19,7 +19,10 @@ package org.apache.flume.conf.properties
 
 import java.io.File;
 
+import org.apache.flume.channel.DefaultChannelFactory;
 import org.apache.flume.conf.file.TestJsonFileConfigurationProvider;
+import org.apache.flume.sink.DefaultSinkFactory;
+import org.apache.flume.source.DefaultSourceFactory;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,8 +38,11 @@ public class TestPropertiesFileConfigura
 
   @Test
   public void testPropertyRead() throws Exception {
-    PropertiesFileConfigurationProvider provider =
-        new PropertiesFileConfigurationProvider();
+    PropertiesFileConfigurationProvider provider = new PropertiesFileConfigurationProvider();
+
+    provider.setChannelFactory(new DefaultChannelFactory());
+    provider.setSourceFactory(new DefaultSourceFactory());
+    provider.setSinkFactory(new DefaultSinkFactory());
 
     provider.setFile(TESTFILE);
     provider.load();