You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flume.apache.org by hs...@apache.org on 2014/09/12 22:19:29 UTC
git commit: FLUME-1491. Support fetching configuration from Zookeeper.
Repository: flume
Updated Branches:
refs/heads/trunk 59f0b4df9 -> dd466c7e4
FLUME-1491. Support fetching configuration from Zookeeper.
(Ashish Paliwal via Hari)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dd466c7e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dd466c7e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dd466c7e
Branch: refs/heads/trunk
Commit: dd466c7e4623d9f5fd459b59836274d852f58d36
Parents: 59f0b4d
Author: Hari Shreedharan <hs...@apache.org>
Authored: Fri Sep 12 13:16:13 2014 -0700
Committer: Hari Shreedharan <hs...@apache.org>
Committed: Fri Sep 12 13:19:11 2014 -0700
----------------------------------------------------------------------
bin/flume-ng | 33 +++--
flume-ng-node/pom.xml | 15 ++
.../node/AbstractConfigurationProvider.java | 18 ++-
.../AbstractZooKeeperConfigurationProvider.java | 104 ++++++++++++++
.../java/org/apache/flume/node/Application.java | 131 ++++++++++++------
.../PollingZooKeeperConfigurationProvider.java | 135 ++++++++++++++++++
.../PropertiesFileConfigurationProvider.java | 15 --
.../StaticZooKeeperConfigurationProvider.java | 55 ++++++++
...tAbstractZooKeeperConfigurationProvider.java | 136 +++++++++++++++++++
...stPollingZooKeeperConfigurationProvider.java | 95 +++++++++++++
...estStaticZooKeeperConfigurationProvider.java | 44 ++++++
pom.xml | 20 +++
12 files changed, 722 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/bin/flume-ng
----------------------------------------------------------------------
diff --git a/bin/flume-ng b/bin/flume-ng
index e09e26b..4b323a6 100755
--- a/bin/flume-ng
+++ b/bin/flume-ng
@@ -174,25 +174,28 @@ display_help() {
Usage: $0 <command> [options]...
commands:
- help display this help text
- agent run a Flume agent
- avro-client run an avro Flume client
- version show Flume version info
+ help display this help text
+ agent run a Flume agent
+ avro-client run an avro Flume client
+ version show Flume version info
global options:
- --conf,-c <conf> use configs in <conf> directory
- --classpath,-C <cp> append to the classpath
- --dryrun,-d do not actually start Flume, just print the command
- --plugins-path <dirs> colon-separated list of plugins.d directories. See the
- plugins.d section in the user guide for more details.
- Default: \$FLUME_HOME/plugins.d
- -Dproperty=value sets a Java system property value
- -Xproperty=value sets a Java -X option
+ --conf,-c <conf> use configs in <conf> directory
+ --classpath,-C <cp> append to the classpath
+ --dryrun,-d do not actually start Flume, just print the command
+ --plugins-path <dirs> colon-separated list of plugins.d directories. See the
+ plugins.d section in the user guide for more details.
+ Default: \$FLUME_HOME/plugins.d
+ -Dproperty=value sets a Java system property value
+ -Xproperty=value sets a Java -X option
agent options:
- --conf-file,-f <file> specify a config file (required)
- --name,-n <name> the name of this agent (required)
- --help,-h display help text
+ --name,-n <name> the name of this agent (required)
+ --conf-file,-f <file> specify a config file (required if -z missing)
+ --zkConnString,-z <str> specify the ZooKeeper connection to use (required if -f missing)
+ --zkBasePath,-p <path> specify the base path in ZooKeeper for agent configs
+ --no-reload-conf do not reload config file if changed
+ --help,-h display help text
avro-client options:
--rpcProps,-P <file> RPC client properties file with server connection params
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-node/pom.xml b/flume-ng-node/pom.xml
index dce2527..caf2c11 100644
--- a/flume-ng-node/pom.xml
+++ b/flume-ng-node/pom.xml
@@ -154,6 +154,21 @@
<artifactId>jackson-mapper-asl</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ </dependency>
+
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
index e63c601..40abba2 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractConfigurationProvider.java
@@ -17,13 +17,8 @@
*/
package org.apache.flume.node;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import java.util.Set;
import org.apache.flume.Channel;
import org.apache.flume.ChannelFactory;
@@ -506,4 +501,15 @@ public abstract class AbstractConfigurationProvider implements
components = Lists.newArrayList();
}
}
+
+ protected Map<String, String> toMap(Properties properties) {
+ Map<String, String> result = Maps.newHashMap();
+ Enumeration<?> propertyNames = properties.propertyNames();
+ while (propertyNames.hasMoreElements()) {
+ String name = (String) propertyNames.nextElement();
+ String value = properties.getProperty(name);
+ result.put(name, value);
+ }
+ return result;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..f193f9f
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/AbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.base.Charsets;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.flume.conf.FlumeConfiguration;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+
+/**
+ * ZooKeeper based configuration implementation provider.
+ *
+ * The Agent configuration can be uploaded in ZooKeeper under a base name, which
+ * defaults to /flume
+ *
+ * Currently the agent configuration is stored under the agent name node in
+ * ZooKeeper
+ *
+ * <PRE>
+ * /flume
+ * /a1 [agent config file]
+ * /a2 [agent config file]
+ * /a3 [agent config file]
+ * </PRE>
+ *
+ * Configuration format is same as PropertiesFileConfigurationProvider
+ *
+ * Configuration properties
+ *
+ * agentName - Name of Agent for which configuration needs to be pulled
+ *
+ * zkConnString - Connection string to ZooKeeper Ensemble
+ * (host:port,host1:port1)
+ *
+ * basePath - Base Path where agent configuration needs to be stored. Defaults
+ * to /flume
+ */
+public abstract class AbstractZooKeeperConfigurationProvider extends
+ AbstractConfigurationProvider {
+
+ static final String DEFAULT_ZK_BASE_PATH = "/flume";
+
+ protected final String basePath;
+
+ protected final String zkConnString;
+
+ protected AbstractZooKeeperConfigurationProvider(String agentName,
+ String zkConnString, String basePath) {
+ super(agentName);
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(zkConnString),
+ "Invalid Zookeeper Connection String %s", zkConnString);
+ this.zkConnString = zkConnString;
+ if (basePath == null || basePath.isEmpty()) {
+ this.basePath = DEFAULT_ZK_BASE_PATH;
+ } else {
+ this.basePath = basePath;
+ }
+ }
+
+ protected CuratorFramework createClient() {
+ return CuratorFrameworkFactory.newClient(zkConnString,
+ new ExponentialBackoffRetry(1000, 1));
+ }
+
+ protected FlumeConfiguration configFromBytes(byte[] configData)
+ throws IOException {
+ Map<String, String> configMap;
+ if (configData == null || configData.length == 0) {
+ configMap = Collections.emptyMap();
+ } else {
+ String fileContent = new String(configData, Charsets.UTF_8);
+ Properties properties = new Properties();
+ properties.load(new StringReader(fileContent));
+ configMap = toMap(properties);
+ }
+ return new FlumeConfiguration(configMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
index 5250139..832285a 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/Application.java
@@ -53,7 +53,7 @@ import com.google.common.collect.Lists;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
-public class Application {
+public class Application {
private static final Logger logger = LoggerFactory
.getLogger(Application.class);
@@ -69,6 +69,7 @@ public class Application {
public Application() {
this(new ArrayList<LifecycleAware>(0));
}
+
public Application(List<LifecycleAware> components) {
this.components = components;
supervisor = new LifecycleSupervisor();
@@ -81,7 +82,6 @@ public class Application {
}
}
-
@Subscribe
public synchronized void handleConfigurationEvent(MaterializedConfiguration conf) {
stopAllComponents();
@@ -95,7 +95,6 @@ public class Application {
}
}
-
private void stopAllComponents() {
if (this.materializedConfiguration != null) {
logger.info("Shutting down configuration: {}", this.materializedConfiguration);
@@ -192,7 +191,6 @@ public class Application {
this.loadMonitoring();
}
-
@SuppressWarnings("unchecked")
private void loadMonitoring() {
Properties systemProps = System.getProperties();
@@ -231,18 +229,32 @@ public class Application {
try {
+ boolean isZkConfigured = false;
+
Options options = new Options();
Option option = new Option("n", "name", true, "the name of this agent");
option.setRequired(true);
options.addOption(option);
- option = new Option("f", "conf-file", true, "specify a conf file");
- option.setRequired(true);
+ option = new Option("f", "conf-file", true,
+ "specify a config file (required if -z missing)");
+ option.setRequired(false);
options.addOption(option);
- option = new Option(null, "no-reload-conf", false, "do not reload " +
- "conf file if changed");
+ option = new Option(null, "no-reload-conf", false,
+ "do not reload config file if changed");
+ options.addOption(option);
+
+ // Options for Zookeeper
+ option = new Option("z", "zkConnString", true,
+ "specify the ZooKeeper connection to use (required if -f missing)");
+ option.setRequired(false);
+ options.addOption(option);
+
+ option = new Option("p", "zkBasePath", true,
+ "specify the base path in ZooKeeper for agent configs");
+ option.setRequired(false);
options.addOption(option);
option = new Option("h", "help", false, "display help text");
@@ -251,47 +263,80 @@ public class Application {
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(options, args);
- File configurationFile = new File(commandLine.getOptionValue('f'));
- String agentName = commandLine.getOptionValue('n');
- boolean reload = !commandLine.hasOption("no-reload-conf");
-
if (commandLine.hasOption('h')) {
new HelpFormatter().printHelp("flume-ng agent", options, true);
return;
}
- /*
- * The following is to ensure that by default the agent
- * will fail on startup if the file does not exist.
- */
- if (!configurationFile.exists()) {
- // If command line invocation, then need to fail fast
- if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) == null) {
- String path = configurationFile.getPath();
- try {
- path = configurationFile.getCanonicalPath();
- } catch (IOException ex) {
- logger.error("Failed to read canonical path for file: " + path, ex);
- }
- throw new ParseException(
- "The specified configuration file does not exist: " + path);
- }
+
+ String agentName = commandLine.getOptionValue('n');
+ boolean reload = !commandLine.hasOption("no-reload-conf");
+
+ if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
+ isZkConfigured = true;
}
- List<LifecycleAware> components = Lists.newArrayList();
- Application application;
- if(reload) {
- EventBus eventBus = new EventBus(agentName + "-event-bus");
- PollingPropertiesFileConfigurationProvider configurationProvider =
- new PollingPropertiesFileConfigurationProvider(agentName,
- configurationFile, eventBus, 30);
- components.add(configurationProvider);
- application = new Application(components);
- eventBus.register(application);
+ Application application = null;
+ if (isZkConfigured) {
+ // get options
+ String zkConnectionStr = commandLine.getOptionValue('z');
+ String baseZkPath = commandLine.getOptionValue('p');
+
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ List<LifecycleAware> components = Lists.newArrayList();
+ PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+ new PollingZooKeeperConfigurationProvider(
+ agentName, zkConnectionStr, baseZkPath, eventBus);
+ components.add(zookeeperConfigurationProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
+ new StaticZooKeeperConfigurationProvider(
+ agentName, zkConnectionStr, baseZkPath);
+ application = new Application();
+ application.handleConfigurationEvent(zookeeperConfigurationProvider
+ .getConfiguration());
+ }
} else {
- PropertiesFileConfigurationProvider configurationProvider =
- new PropertiesFileConfigurationProvider(agentName,
- configurationFile);
- application = new Application();
- application.handleConfigurationEvent(configurationProvider.getConfiguration());
+ File configurationFile = new File(commandLine.getOptionValue('f'));
+
+ /*
+ * The following is to ensure that by default the agent will fail on
+ * startup if the file does not exist.
+ */
+ if (!configurationFile.exists()) {
+ // If command line invocation, then need to fail fast
+ if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
+ null) {
+ String path = configurationFile.getPath();
+ try {
+ path = configurationFile.getCanonicalPath();
+ } catch (IOException ex) {
+ logger.error("Failed to read canonical path for file: " + path,
+ ex);
+ }
+ throw new ParseException(
+ "The specified configuration file does not exist: " + path);
+ }
+ }
+ List<LifecycleAware> components = Lists.newArrayList();
+
+ if (reload) {
+ EventBus eventBus = new EventBus(agentName + "-event-bus");
+ PollingPropertiesFileConfigurationProvider configurationProvider =
+ new PollingPropertiesFileConfigurationProvider(
+ agentName, configurationFile, eventBus, 30);
+ components.add(configurationProvider);
+ application = new Application(components);
+ eventBus.register(application);
+ } else {
+ PropertiesFileConfigurationProvider configurationProvider =
+ new PropertiesFileConfigurationProvider(
+ agentName, configurationFile);
+ application = new Application();
+ application.handleConfigurationEvent(configurationProvider
+ .getConfiguration());
+ }
}
application.start();
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..b950b3d
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.lifecycle.LifecycleAware;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.eventbus.EventBus;
+
+public class PollingZooKeeperConfigurationProvider extends
+ AbstractZooKeeperConfigurationProvider implements LifecycleAware {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(PollingZooKeeperConfigurationProvider.class);
+
+ private final EventBus eventBus;
+
+ private final CuratorFramework client;
+
+ private NodeCache agentNodeCache;
+
+ private FlumeConfiguration flumeConfiguration;
+
+ private LifecycleState lifecycleState;
+
+ public PollingZooKeeperConfigurationProvider(String agentName,
+ String zkConnString, String basePath, EventBus eventBus) {
+ super(agentName, zkConnString, basePath);
+ this.eventBus = eventBus;
+ client = createClient();
+ agentNodeCache = null;
+ flumeConfiguration = null;
+ lifecycleState = LifecycleState.IDLE;
+ }
+
+ @Override
+ protected FlumeConfiguration getFlumeConfiguration() {
+ return flumeConfiguration;
+ }
+
+ @Override
+ public void start() {
+ LOGGER.debug("Starting...");
+ try {
+ client.start();
+ try {
+ agentNodeCache = new NodeCache(client, basePath + "/" + getAgentName());
+ agentNodeCache.start();
+ agentNodeCache.getListenable().addListener(new NodeCacheListener() {
+ @Override
+ public void nodeChanged() throws Exception {
+ refreshConfiguration();
+ }
+ });
+ } catch (Exception e) {
+ client.close();
+ throw e;
+ }
+ } catch (Exception e) {
+ lifecycleState = LifecycleState.ERROR;
+ if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new FlumeException(e);
+ }
+ }
+ lifecycleState = LifecycleState.START;
+ }
+
+ private void refreshConfiguration() throws IOException {
+ LOGGER.info("Refreshing configuration from ZooKeeper");
+ byte[] data = null;
+ ChildData childData = agentNodeCache.getCurrentData();
+ if (childData != null) {
+ data = childData.getData();
+ }
+ flumeConfiguration = configFromBytes(data);
+ eventBus.post(getConfiguration());
+ }
+
+ @Override
+ public void stop() {
+ LOGGER.debug("Stopping...");
+ if (agentNodeCache != null) {
+ try {
+ agentNodeCache.close();
+ } catch (IOException e) {
+ LOGGER.warn("Encountered exception while stopping", e);
+ lifecycleState = LifecycleState.ERROR;
+ }
+ }
+
+ try {
+ client.close();
+ } catch (Exception e) {
+ LOGGER.warn("Error stopping Curator client", e);
+ lifecycleState = LifecycleState.ERROR;
+ }
+
+ if (lifecycleState != LifecycleState.ERROR) {
+ lifecycleState = LifecycleState.STOP;
+ }
+ }
+
+ @Override
+ public LifecycleState getLifecycleState() {
+ return lifecycleState;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
index d7438d9..bc5438a 100644
--- a/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/PropertiesFileConfigurationProvider.java
@@ -21,17 +21,13 @@ import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
-import java.util.Enumeration;
import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import org.apache.flume.conf.FlumeConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Maps;
-
/**
* <p>
* A configuration provider that uses properties file for specifying
@@ -206,15 +202,4 @@ public class PropertiesFileConfigurationProvider extends
}
return new FlumeConfiguration(new HashMap<String, String>());
}
-
- private Map<String, String> toMap(Properties properties) {
- Map<String, String> result = Maps.newHashMap();
- Enumeration<?> propertyNames = properties.propertyNames();
- while (propertyNames.hasMoreElements()) {
- String name = (String) propertyNames.nextElement();
- String value = properties.getProperty(name);
- result.put(name, value);
- }
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java b/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..551e9dd
--- /dev/null
+++ b/flume-ng-node/src/main/java/org/apache/flume/node/StaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flume.FlumeException;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StaticZooKeeperConfigurationProvider extends
+ AbstractZooKeeperConfigurationProvider {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(StaticZooKeeperConfigurationProvider.class);
+
+ public StaticZooKeeperConfigurationProvider(String agentName,
+ String zkConnString, String basePath) {
+ super(agentName, zkConnString, basePath);
+ }
+
+ @Override
+ protected FlumeConfiguration getFlumeConfiguration() {
+ try {
+ CuratorFramework cf = createClient();
+ cf.start();
+ try {
+ byte[] data = cf.getData().forPath(basePath + "/" + getAgentName());
+ return configFromBytes(data);
+ } finally {
+ cf.close();
+ }
+ } catch (Exception e) {
+ LOGGER.error("Error getting configuration info from Zookeeper", e);
+ throw new FlumeException(e);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..1ab4127
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestAbstractZooKeeperConfigurationProvider.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import com.google.common.base.Charsets;
+import junit.framework.Assert;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.utils.EnsurePath;
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfigurationError;
+import org.junit.After;
+import org.junit.Before;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public abstract class TestAbstractZooKeeperConfigurationProvider {
+
+ private static final String FLUME_CONF_FILE = "flume-conf.properties";
+
+ protected static final String AGENT_NAME = "a1";
+
+ protected static final String AGENT_PATH =
+ AbstractZooKeeperConfigurationProvider.DEFAULT_ZK_BASE_PATH
+ + "/" + AGENT_NAME;
+
+ protected TestingServer zkServer;
+ protected CuratorFramework client;
+
+ @Before
+ public void setUp() throws Exception {
+ zkServer = new TestingServer();
+ client = CuratorFrameworkFactory
+ .newClient("localhost:" + zkServer.getPort(),
+ new ExponentialBackoffRetry(1000, 3));
+ client.start();
+
+ EnsurePath ensurePath = new EnsurePath(AGENT_PATH);
+ ensurePath.ensure(client.getZookeeperClient());
+ doSetUp();
+ }
+
+ protected abstract void doSetUp() throws Exception;
+
+ @After
+ public void tearDown() throws Exception {
+ doTearDown();
+ zkServer.close();
+ client.close();
+ }
+
+ protected abstract void doTearDown() throws Exception;
+
+ protected void addData() throws Exception {
+ Reader in = new InputStreamReader(getClass().getClassLoader()
+ .getResourceAsStream(FLUME_CONF_FILE), Charsets.UTF_8);
+ try {
+ String config = IOUtils.toString(in);
+ client.setData().forPath(AGENT_PATH, config.getBytes());
+ } finally {
+ in.close();
+ }
+ }
+
+ protected void verifyProperties(AbstractConfigurationProvider cp) {
+ FlumeConfiguration configuration = cp.getFlumeConfiguration();
+ Assert.assertNotNull(configuration);
+
+ /*
+ * Test the known errors in the file
+ */
+ List<String> expected = Lists.newArrayList();
+ expected.add("host5 CONFIG_ERROR");
+ expected.add("host5 INVALID_PROPERTY");
+ expected.add("host4 CONFIG_ERROR");
+ expected.add("host4 CONFIG_ERROR");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 PROPERTY_VALUE_NULL");
+ expected.add("host4 AGENT_CONFIGURATION_INVALID");
+ expected.add("ch2 ATTRS_MISSING");
+ expected.add("host3 CONFIG_ERROR");
+ expected.add("host3 PROPERTY_VALUE_NULL");
+ expected.add("host3 AGENT_CONFIGURATION_INVALID");
+ expected.add("host2 PROPERTY_VALUE_NULL");
+ expected.add("host2 AGENT_CONFIGURATION_INVALID");
+ List<String> actual = Lists.newArrayList();
+ for (FlumeConfigurationError error : configuration
+ .getConfigurationErrors()) {
+ actual.add(error.getComponentName() + " "
+ + error.getErrorType().toString());
+ }
+ Collections.sort(expected);
+ Collections.sort(actual);
+ Assert.assertEquals(expected, actual);
+
+ FlumeConfiguration.AgentConfiguration agentConfiguration = configuration
+ .getConfigurationFor("host1");
+ Assert.assertNotNull(agentConfiguration);
+
+ Set<String> sources = Sets.newHashSet("source1");
+ Set<String> sinks = Sets.newHashSet("sink1");
+ Set<String> channels = Sets.newHashSet("channel1");
+
+ Assert.assertEquals(sources, agentConfiguration.getSourceSet());
+ Assert.assertEquals(sinks, agentConfiguration.getSinkSet());
+ Assert.assertEquals(channels, agentConfiguration.getChannelSet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..e59a438
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestPollingZooKeeperConfigurationProvider.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import junit.framework.Assert;
+
+import org.apache.flume.conf.FlumeConfiguration;
+import org.apache.flume.conf.FlumeConfiguration.AgentConfiguration;
+import org.apache.flume.lifecycle.LifecycleController;
+import org.apache.flume.lifecycle.LifecycleState;
+import org.junit.Test;
+
+import com.google.common.eventbus.EventBus;
+import com.google.common.eventbus.Subscribe;
+
+public class TestPollingZooKeeperConfigurationProvider extends
+ TestAbstractZooKeeperConfigurationProvider {
+
+ private EventBus eb;
+
+ private EventSync es;
+
+ private PollingZooKeeperConfigurationProvider cp;
+
+ private class EventSync {
+
+ private boolean notified;
+
+ @Subscribe
+ public synchronized void notifyEvent(MaterializedConfiguration mConfig) {
+ notified = true;
+ notifyAll();
+ }
+
+ public synchronized void awaitEvent() throws InterruptedException {
+ while (!notified) {
+ wait();
+ }
+ }
+
+ public synchronized void reset() {
+ notified = false;
+ }
+ }
+
+ @Override
+ protected void doSetUp() throws Exception {
+ eb = new EventBus("test");
+ es = new EventSync();
+ es.reset();
+ eb.register(es);
+ cp = new PollingZooKeeperConfigurationProvider(AGENT_NAME, "localhost:"
+ + zkServer.getPort(), null, eb);
+ cp.start();
+ LifecycleController.waitForOneOf(cp, LifecycleState.START_OR_ERROR);
+ }
+
+ @Override
+ protected void doTearDown() throws Exception {
+ // do nothing
+ }
+
+ @Test
+ public void testPolling() throws Exception {
+ es.awaitEvent();
+ es.reset();
+
+ FlumeConfiguration fc = cp.getFlumeConfiguration();
+ Assert.assertTrue(fc.getConfigurationErrors().isEmpty());
+ AgentConfiguration ac = fc.getConfigurationFor(AGENT_NAME);
+ Assert.assertNull(ac);
+
+ addData();
+ es.awaitEvent();
+ es.reset();
+
+ verifyProperties(cp);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java b/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java
new file mode 100644
index 0000000..dddcffe
--- /dev/null
+++ b/flume-ng-node/src/test/java/org/apache/flume/node/TestStaticZooKeeperConfigurationProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flume.node;
+
+import org.junit.Test;
+
+public class TestStaticZooKeeperConfigurationProvider extends
+ TestAbstractZooKeeperConfigurationProvider {
+
+ private StaticZooKeeperConfigurationProvider configurationProvider;
+
+ @Override
+ protected void doSetUp() throws Exception {
+ addData();
+ configurationProvider = new StaticZooKeeperConfigurationProvider(
+ AGENT_NAME, "localhost:" + zkServer.getPort(), null);
+ }
+
+ @Override
+ protected void doTearDown() throws Exception {
+ // do nothing
+ }
+
+ @Test
+ public void testPropertyRead() throws Exception {
+ verifyProperties(configurationProvider);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flume/blob/dd466c7e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4bdfcac..150db2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1276,6 +1276,26 @@ limitations under the License.
<version>1.1.0</version>
</dependency>
+ <!-- Dependency for Zk provider -->
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ <version>2.3.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>2.3.0</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</dependencyManagement>