You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/12 22:19:29 UTC

git commit: FLUME-1491. Support fetching configuration from Zookeeper.

Repository: flume
Updated Branches:
  refs/heads/trunk 59f0b4df9 -> dd466c7e4


FLUME-1491. Support fetching configuration from Zookeeper.

(Ashish Paliwal via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dd466c7e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dd466c7e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dd466c7e

Branch: refs/heads/trunk
Commit: dd466c7e4623d9f5fd459b59836274d852f58d36
Parents: 59f0b4d
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Sep 12 13:16:13 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Sep 12 13:19:11 2014 -0700

----------------------------------------------------------------------
 bin/flume-ng                                    |  33 +++--
 flume-ng-node/pom.xml                           |  15 ++
 .../node/AbstractConfigurationProvider.java     |  18 ++-
 .../AbstractZooKeeperConfigurationProvider.java | 104 ++++++++++++++
 .../java/org/apache/flume/node/Application.java | 131 ++++++++++++------
 .../PollingZooKeeperConfigurationProvider.java  | 135 ++++++++++++++++++
 .../PropertiesFileConfigurationProvider.java    |  15 --
 .../StaticZooKeeperConfigurationProvider.java   |  55 ++++++++
 ...tAbstractZooKeeperConfigurationProvider.java | 136 +++++++++++++++++++
 ...stPollingZooKeeperConfigurationProvider.java |  95 +++++++++++++
 ...estStaticZooKeeperConfigurationProvider.java |  44 ++++++
 pom.xml                                         |  20 +++
 12 files changed, 722 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/bin/flume-ng
----------------------------------------------------------------------
diff --git a/bin/flume-ng b/bin/flume-ng
index e09e26b..4b323a6 100755
--- a/bin/flume-ng
+++ b/bin/flume-ng
@@ -174,25 +174,28 @@ display_help() {
 Usage: $0 <command> [options]...
 
 commands:
-  help                  display this help text
-  agent                 run a Flume agent
-  avro-client           run an avro Flume client
-  version               show Flume version info
+  help                      display this help text
+  agent                     run a Flume agent
+  avro-client               run an avro Flume client
+  version                   show Flume version info
 
 global options:
-  --conf,-c <conf>      use configs in <conf> directory
-  --classpath,-C <cp>   append to the classpath
-  --dryrun,-d           do not actually start Flume, just print the command
-  --plugins-path <dirs> colon-separated list of plugins.d directories. See the
-                        plugins.d section in the user guide for more details.
-                        Default: \$FLUME_HOME/plugins.d
-  -Dproperty=value      sets a Java system property value
-  -Xproperty=value      sets a Java -X option
+  --conf,-c <conf>          use configs in <conf> directory
+  --classpath,-C <cp>       append to the classpath
+  --dryrun,-d               do not actually start Flume, just print the command
+  --plugins-path <dirs>     colon-separated list of plugins.d directories. See the
+                            plugins.d section in the user guide for more details.
+                            Default: \$FLUME_HOME/plugins.d
+  -Dproperty=value          sets a Java system property value
+  -Xproperty=value          sets a Java -X option
 
 agent options:
-  --conf-file,-f <file> specify a config file (required)
-  --name,-n <name>      the name of this agent (required)
-  --help,-h             display help text
+  --name,-n <name>          the name of this agent (required)
+  --conf-file,-f <file>     specify a config file (required if -z missing)
+  --zkConnString,-z <str>   specify the ZooKeeper connection to use (required if -f missing)
+  --zkBasePath,-p <path>    specify the base path in ZooKeeper for agent configs
+  --no-reload-conf          do not reload config file if changed
+  --help,-h                 display help text
 
 avro-client options:
   --rpcProps,-P <file>   RPC client properties file with server connection params

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml
index dce2527..caf2c11 100644
--- a/flume-ng-node/pom.xml
+++ b/flume-ng-node/pom.xml
@@ -154,6 +154,21 @@
       <artifactId>jackson-mapper-asl</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+    </dependency>
+
+  <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+  </dependency>
+
+  <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+  </dependency>
+
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
index e63c601..40abba2 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -17,13 +17,8 @@
  */
 package org.apache.flume.node;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelFactory;
@@ -506,4 +501,15 @@ public abstract class AbstractConfigurationProvider implements
       components = Lists.newArrayList();
     }
   }
+
+  protected Map<String, String> toMap(Properties properties) {
+    Map<String, String> result = Maps.newHashMap();
+    Enumeration<?> propertyNames = properties.propertyNames();
+    while (propertyNames.hasMoreElements()) {
+      String name = (String) propertyNames.nextElement();
+      String value = properties.getProperty(name);
+      result.put(name, value);
+    }
+    return result;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..f193f9f
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Charsets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flume.conf.FlumeConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * ZooKeeper based configuration implementation provider.
+ * 
+ * The Agent configuration can be uploaded in ZooKeeper under a base name, which
+ * defaults to /flume
+ * 
+ * Currently the agent configuration is stored under the agent name node in
+ * ZooKeeper
+ * 
+ * <PRE>
+ *   /flume
+ *       /a1 [agent config file]
+ *       /a2 [agent config file]
+ *       /a3 [agent config file]
+ * </PRE>
+ * 
+ * Configuration format is same as PropertiesFileConfigurationProvider
+ * 
+ * Configuration properties
+ * 
+ * agentName - Name of Agent for which configuration needs to be pulled
+ * 
+ * zkConnString - Connection string to ZooKeeper Ensemble
+ * (host:port,host1:port1)
+ * 
+ * basePath - Base Path where agent configuration needs to be stored. Defaults
+ * to /flume
+ */
+public abstract class AbstractZooKeeperConfigurationProvider extends
+    AbstractConfigurationProvider {
+
+  static final String DEFAULT_ZK_BASE_PATH = "/flume";
+
+  protected final String basePath;
+
+  protected final String zkConnString;
+
+  protected AbstractZooKeeperConfigurationProvider(String agentName,
+      String zkConnString, String basePath) {
+    super(agentName);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(zkConnString),
+        "Invalid Zookeeper Connection String %s", zkConnString);
+    this.zkConnString = zkConnString;
+    if (basePath == null || basePath.isEmpty()) {
+      this.basePath = DEFAULT_ZK_BASE_PATH;
+    } else {
+      this.basePath = basePath;
+    }
+  }
+
+  protected CuratorFramework createClient() {
+    return CuratorFrameworkFactory.newClient(zkConnString,
+        new ExponentialBackoffRetry(1000, 1));
+  }
+
+  protected FlumeConfiguration configFromBytes(byte[] configData)
+      throws IOException {
+    Map<String, String> configMap;
+    if (configData == null || configData.length == 0) {
+      configMap = Collections.emptyMap();
+    } else {
+      String fileContent = new String(configData, Charsets.UTF_8);
+      Properties properties = new Properties();
+      properties.load(new StringReader(fileContent));
+      configMap = toMap(properties);
+    }
+    return new FlumeConfiguration(configMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 5250139..832285a 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -53,7 +53,7 @@ import com.google.common.collect.Lists;
 import com.google.common.eventbus.EventBus;
 import com.google.common.eventbus.Subscribe;
 
-public class Application  {
+public class Application {
 
   private static final Logger logger = LoggerFactory
       .getLogger(Application.class);
@@ -69,6 +69,7 @@ public class Application  {
   public Application() {
     this(new ArrayList<LifecycleAware>(0));
   }
+
   public Application(List<LifecycleAware> components) {
     this.components = components;
     supervisor = new LifecycleSupervisor();
@@ -81,7 +82,6 @@ public class Application  {
     }
   }
 
-
   @Subscribe
   public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
     stopAllComponents();
@@ -95,7 +95,6 @@ public class Application  {
     }
   }
 
-
   private void stopAllComponents() {
     if (this.materializedConfiguration != null) {
       logger.info("Shutting down configuration: {}", this.materializedConfiguration);
@@ -192,7 +191,6 @@ public class Application  {
     this.loadMonitoring();
   }
 
-
   @SuppressWarnings("unchecked")
   private void loadMonitoring() {
     Properties systemProps = System.getProperties();
@@ -231,18 +229,32 @@ public class Application  {
 
     try {
 
+      boolean isZkConfigured = false;
+
       Options options = new Options();
 
       Option option = new Option("n", "name", true, "the name of this agent");
       option.setRequired(true);
       options.addOption(option);
 
-      option = new Option("f", "conf-file", true, "specify a conf file");
-      option.setRequired(true);
+      option = new Option("f", "conf-file", true,
+          "specify a config file (required if -z missing)");
+      option.setRequired(false);
       options.addOption(option);
 
-      option = new Option(null, "no-reload-conf", false, "do not reload " +
-        "conf file if changed");
+      option = new Option(null, "no-reload-conf", false,
+          "do not reload config file if changed");
+      options.addOption(option);
+
+      // Options for Zookeeper
+      option = new Option("z", "zkConnString", true,
+          "specify the ZooKeeper connection to use (required if -f missing)");
+      option.setRequired(false);
+      options.addOption(option);
+
+      option = new Option("p", "zkBasePath", true,
+          "specify the base path in ZooKeeper for agent configs");
+      option.setRequired(false);
       options.addOption(option);
 
       option = new Option("h", "help", false, "display help text");
@@ -251,47 +263,80 @@ public class Application  {
       CommandLineParser parser = new GnuParser();
       CommandLine commandLine = parser.parse(options, args);
 
-      File configurationFile = new File(commandLine.getOptionValue('f'));
-      String agentName = commandLine.getOptionValue('n');
-      boolean reload = !commandLine.hasOption("no-reload-conf");
-
       if (commandLine.hasOption('h')) {
         new HelpFormatter().printHelp("flume-ng agent", options, true);
         return;
       }
-      /*
-       * The following is to ensure that by default the agent
-       * will fail on startup if the file does not exist.
-       */
-      if (!configurationFile.exists()) {
-        // If command line invocation, then need to fail fast
-        if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
-          String path = configurationFile.getPath();
-          try {
-            path = configurationFile.getCanonicalPath();
-          } catch (IOException ex) {
-            logger.error("Failed to read canonical path for file: " + path, ex);
-          }
-          throw new ParseException(
-              "The specified configuration file does not exist: " + path);
-        }
+
+      String agentName = commandLine.getOptionValue('n');
+      boolean reload = !commandLine.hasOption("no-reload-conf");
+
+      if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
+        isZkConfigured = true;
       }
-      List<LifecycleAware> components = Lists.newArrayList();
-      Application application;
-      if(reload) {
-        EventBus eventBus = new EventBus(agentName + "-event-bus");
-        PollingPropertiesFileConfigurationProvider configurationProvider =
-            new PollingPropertiesFileConfigurationProvider(agentName,
-                configurationFile, eventBus, 30);
-        components.add(configurationProvider);
-        application = new Application(components);
-        eventBus.register(application);
+      Application application = null;
+      if (isZkConfigured) {
+        // get options
+        String zkConnectionStr = commandLine.getOptionValue('z');
+        String baseZkPath = commandLine.getOptionValue('p');
+
+        if (reload) {
+          EventBus eventBus = new EventBus(agentName + "-event-bus");
+          List<LifecycleAware> components = Lists.newArrayList();
+          PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+            new PollingZooKeeperConfigurationProvider(
+              agentName, zkConnectionStr, baseZkPath, eventBus);
+          components.add(zookeeperConfigurationProvider);
+          application = new Application(components);
+          eventBus.register(application);
+        } else {
+          StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+            new StaticZooKeeperConfigurationProvider(
+              agentName, zkConnectionStr, baseZkPath);
+          application = new Application();
+          application.handleConfigurationEvent(zookeeperConfigurationProvider
+            .getConfiguration());
+        }
       } else {
-        PropertiesFileConfigurationProvider configurationProvider =
-            new PropertiesFileConfigurationProvider(agentName,
-                configurationFile);
-        application = new Application();
-        application.handleConfigurationEvent(configurationProvider.getConfiguration());
+        File configurationFile = new File(commandLine.getOptionValue('f'));
+
+        /*
+         * The following is to ensure that by default the agent will fail on
+         * startup if the file does not exist.
+         */
+        if (!configurationFile.exists()) {
+          // If command line invocation, then need to fail fast
+          if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
+            null) {
+            String path = configurationFile.getPath();
+            try {
+              path = configurationFile.getCanonicalPath();
+            } catch (IOException ex) {
+              logger.error("Failed to read canonical path for file: " + path,
+                ex);
+            }
+            throw new ParseException(
+              "The specified configuration file does not exist: " + path);
+          }
+        }
+        List<LifecycleAware> components = Lists.newArrayList();
+
+        if (reload) {
+          EventBus eventBus = new EventBus(agentName + "-event-bus");
+          PollingPropertiesFileConfigurationProvider configurationProvider =
+            new PollingPropertiesFileConfigurationProvider(
+              agentName, configurationFile, eventBus, 30);
+          components.add(configurationProvider);
+          application = new Application(components);
+          eventBus.register(application);
+        } else {
+          PropertiesFileConfigurationProvider configurationProvider =
+            new PropertiesFileConfigurationProvider(
+              agentName, configurationFile);
+          application = new Application();
+          application.handleConfigurationEvent(configurationProvider
+            .getConfiguration());
+        }
       }
       application.start();
 

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..b950b3d
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+
+public class PollingZooKeeperConfigurationProvider extends
+    AbstractZooKeeperConfigurationProvider implements LifecycleAware {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(PollingZooKeeperConfigurationProvider.class);
+
+  private final EventBus eventBus;
+
+  private final CuratorFramework client;
+
+  private NodeCache agentNodeCache;
+
+  private FlumeConfiguration flumeConfiguration;
+
+  private LifecycleState lifecycleState;
+
+  public PollingZooKeeperConfigurationProvider(String agentName,
+      String zkConnString, String basePath, EventBus eventBus) {
+    super(agentName, zkConnString, basePath);
+    this.eventBus = eventBus;
+    client = createClient();
+    agentNodeCache = null;
+    flumeConfiguration = null;
+    lifecycleState = LifecycleState.IDLE;
+  }
+
+  @Override
+  protected FlumeConfiguration getFlumeConfiguration() {
+    return flumeConfiguration;
+  }
+
+  @Override
+  public void start() {
+    LOGGER.debug("Starting...");
+    try {
+      client.start();
+      try {
+        agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
+        agentNodeCache.start();
+        agentNodeCache.getListenable().addListener(new NodeCacheListener() {
+          @Override
+          public void nodeChanged() throws Exception {
+            refreshConfiguration();
+          }
+        });
+      } catch (Exception e) {
+        client.close();
+        throw e;
+      }
+    } catch (Exception e) {
+      lifecycleState = LifecycleState.ERROR;
+      if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      } else {
+        throw new FlumeException(e);
+      }
+    }
+    lifecycleState = LifecycleState.START;
+  }
+
+  private void refreshConfiguration() throws IOException {
+    LOGGER.info("Refreshing configuration from ZooKeeper");
+    byte[] data = null;
+    ChildData childData = agentNodeCache.getCurrentData();
+    if (childData != null) {
+      data = childData.getData();
+    }
+    flumeConfiguration = configFromBytes(data);
+    eventBus.post(getConfiguration());
+  }
+
+  @Override
+  public void stop() {
+    LOGGER.debug("Stopping...");
+    if (agentNodeCache != null) {
+      try {
+        agentNodeCache.close();
+      } catch (IOException e) {
+        LOGGER.warn("Encountered exception while stopping", e);
+        lifecycleState = LifecycleState.ERROR;
+      }
+    }
+
+    try {
+      client.close();
+    } catch (Exception e) {
+      LOGGER.warn("Error stopping Curator client", e);
+      lifecycleState = LifecycleState.ERROR;
+    }
+
+    if (lifecycleState != LifecycleState.ERROR) {
+      lifecycleState = LifecycleState.STOP;
+    }
+  }
+
+  @Override
+  public LifecycleState getLifecycleState() {
+    return lifecycleState;
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
index d7438d9..bc5438a 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
@@ -21,17 +21,13 @@ import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
-import java.util.Enumeration;
 import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
 import org.apache.flume.conf.FlumeConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  * <p>
  * A configuration provider that uses properties file for specifying
@@ -206,15 +202,4 @@ public class PropertiesFileConfigurationProvider extends
     }
     return new FlumeConfiguration(new HashMap<String, String>());
   }
-
-  private Map<String, String> toMap(Properties properties) {
-    Map<String, String> result = Maps.newHashMap();
-    Enumeration<?> propertyNames = properties.propertyNames();
-    while (propertyNames.hasMoreElements()) {
-      String name = (String) propertyNames.nextElement();
-      String value = properties.getProperty(name);
-      result.put(name, value);
-    }
-    return result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..551e9dd
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StaticZooKeeperConfigurationProvider extends
+    AbstractZooKeeperConfigurationProvider {
+
+  private static final Logger LOGGER = LoggerFactory
+      .getLogger(StaticZooKeeperConfigurationProvider.class);
+
+  public StaticZooKeeperConfigurationProvider(String agentName,
+      String zkConnString, String basePath) {
+    super(agentName, zkConnString, basePath);
+  }
+
+  @Override
+  protected FlumeConfiguration getFlumeConfiguration() {
+    try {
+      CuratorFramework cf = createClient();
+      cf.start();
+      try {
+        byte[] data = cf.getData().forPath(basePath + "/" + getAgentName());
+        return configFromBytes(data);
+      } finally {
+        cf.close();
+      }
+    } catch (Exception e) {
+      LOGGER.error("Error getting configuration info from Zookeeper", e);
+      throw new FlumeException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..1ab4127
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Charsets;
+import junit.framework.Assert;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public abstract class TestAbstractZooKeeperConfigurationProvider {
+
+  private static final String FLUME_CONF_FILE = "flume-conf.properties";
+
+  protected static final String AGENT_NAME = "a1";
+
+  protected static final String AGENT_PATH =
+    AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH
+      + "/" + AGENT_NAME;
+
+  protected TestingServer zkServer;
+  protected CuratorFramework client;
+
+  @Before
+  public void setUp() throws Exception {
+    zkServer = new TestingServer();
+    client = CuratorFrameworkFactory
+        .newClient("localhost:" + zkServer.getPort(),
+            new ExponentialBackoffRetry(1000, 3));
+    client.start();
+
+    EnsurePath ensurePath = new EnsurePath(AGENT_PATH);
+    ensurePath.ensure(client.getZookeeperClient());
+    doSetUp();
+  }
+
+  protected abstract void doSetUp() throws Exception;
+
+  @After
+  public void tearDown() throws Exception {
+    doTearDown();
+    zkServer.close();
+    client.close();
+  }
+
+  protected abstract void doTearDown() throws Exception;
+
+  protected void addData() throws Exception {
+    Reader in = new InputStreamReader(getClass().getClassLoader()
+        .getResourceAsStream(FLUME_CONF_FILE), Charsets.UTF_8);
+    try {
+      String config = IOUtils.toString(in);
+      client.setData().forPath(AGENT_PATH, config.getBytes());
+    } finally {
+      in.close();
+    }
+  }
+
+  protected void verifyProperties(AbstractConfigurationProvider cp) {
+    FlumeConfiguration configuration = cp.getFlumeConfiguration();
+    Assert.assertNotNull(configuration);
+
+    /*
+     * Test the known errors in the file
+     */
+    List<String> expected = Lists.newArrayList();
+    expected.add("host5 CONFIG_ERROR");
+    expected.add("host5 INVALID_PROPERTY");
+    expected.add("host4 CONFIG_ERROR");
+    expected.add("host4 CONFIG_ERROR");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 PROPERTY_VALUE_NULL");
+    expected.add("host4 AGENT_CONFIGURATION_INVALID");
+    expected.add("ch2 ATTRS_MISSING");
+    expected.add("host3 CONFIG_ERROR");
+    expected.add("host3 PROPERTY_VALUE_NULL");
+    expected.add("host3 AGENT_CONFIGURATION_INVALID");
+    expected.add("host2 PROPERTY_VALUE_NULL");
+    expected.add("host2 AGENT_CONFIGURATION_INVALID");
+    List<String> actual = Lists.newArrayList();
+    for (FlumeConfigurationError error : configuration
+      .getConfigurationErrors()) {
+      actual.add(error.getComponentName() + " "
+          + error.getErrorType().toString());
+    }
+    Collections.sort(expected);
+    Collections.sort(actual);
+    Assert.assertEquals(expected, actual);
+
+    FlumeConfiguration.AgentConfiguration agentConfiguration = configuration
+        .getConfigurationFor("host1");
+    Assert.assertNotNull(agentConfiguration);
+
+    Set<String> sources = Sets.newHashSet("source1");
+    Set<String> sinks = Sets.newHashSet("sink1");
+    Set<String> channels = Sets.newHashSet("channel1");
+
+    Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+    Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+    Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..e59a438
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import junit.framework.Assert;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+public class TestPollingZooKeeperConfigurationProvider extends
+    TestAbstractZooKeeperConfigurationProvider {
+
+  private EventBus eb;
+
+  private EventSync es;
+
+  private PollingZooKeeperConfigurationProvider cp;
+
+  private class EventSync {
+
+    private boolean notified;
+
+    @Subscribe
+    public synchronized void notifyEvent(MaterializedConfiguration mConfig) {
+      notified = true;
+      notifyAll();
+    }
+
+    public synchronized void awaitEvent() throws InterruptedException {
+      while (!notified) {
+        wait();
+      }
+    }
+
+    public synchronized void reset() {
+      notified = false;
+    }
+  }
+
+  @Override
+  protected void doSetUp() throws Exception {
+    eb = new EventBus("test");
+    es = new EventSync();
+    es.reset();
+    eb.register(es);
+    cp = new PollingZooKeeperConfigurationProvider(AGENT_NAME, "localhost:"
+        + zkServer.getPort(), null, eb);
+    cp.start();
+    LifecycleController.waitForOneOf(cp, LifecycleState.START_OR_ERROR);
+  }
+
+  @Override
+  protected void doTearDown() throws Exception {
+    // do nothing
+  }
+
+  @Test
+  public void testPolling() throws Exception {
+    es.awaitEvent();
+    es.reset();
+
+    FlumeConfiguration fc = cp.getFlumeConfiguration();
+    Assert.assertTrue(fc.getConfigurationErrors().isEmpty());
+    AgentConfiguration ac = fc.getConfigurationFor(AGENT_NAME);
+    Assert.assertNull(ac);
+
+    addData();
+    es.awaitEvent();
+    es.reset();
+
+    verifyProperties(cp);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..dddcffe
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import org.junit.Test;
+
+public class TestStaticZooKeeperConfigurationProvider extends
+    TestAbstractZooKeeperConfigurationProvider {
+
+  private StaticZooKeeperConfigurationProvider configurationProvider;
+
+  @Override
+  protected void doSetUp() throws Exception {
+    addData();
+    configurationProvider = new StaticZooKeeperConfigurationProvider(
+        AGENT_NAME, "localhost:" + zkServer.getPort(), null);
+  }
+
+  @Override
+  protected void doTearDown() throws Exception {
+    // do nothing
+  }
+
+  @Test
+  public void testPropertyRead() throws Exception {
+    verifyProperties(configurationProvider);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4bdfcac..150db2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1276,6 +1276,26 @@ limitations under the License.
         <version>1.1.0</version>
       </dependency>
 
+      <!-- Dependency for Zk provider -->
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-framework</artifactId>
+        <version>2.3.0</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>2.3.0</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-test</artifactId>
+        <version>2.3.0</version>
+        <scope>test</scope>
+      </dependency>
+
     </dependencies>
   </dependencyManagement>