You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:18 UTC

[09/15] Adding Helix-task-framework and Yarn integration modules

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/assembly/assembly.xml
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/assembly/assembly.xml b/recipes/meta-cluster-manager/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..03b2ca5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/assembly/assembly.xml
@@ -0,0 +1,32 @@
+<assembly
+  xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" 
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+  http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+
+  <id>assembly</id>
+  <formats>
+    <format>tar.gz</format>
+  </formats>
+  <baseDirectory>metamanager</baseDirectory>
+  <fileSets>
+    <fileSet>
+      <directory>target/metamanager-pkg/repo</directory>
+      <outputDirectory>repo</outputDirectory>
+      <excludes>
+        <exclude>**/maven-metadata-appassembler.xml</exclude>
+      </excludes>
+      <fileMode>0644</fileMode>
+    </fileSet> 
+    <fileSet>
+      <directory>target/metamanager-pkg/bin</directory>
+      <outputDirectory>bin</outputDirectory>
+      <fileMode>0755</fileMode>
+    </fileSet> 
+    <fileSet>
+      <directory>target/metamanager-pkg/conf</directory>
+      <outputDirectory>conf</outputDirectory>
+      <fileMode>0644</fileMode>
+    </fileSet> 
+  </fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/config/log4j.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/config/log4j.properties b/recipes/meta-cluster-manager/src/main/config/log4j.properties
new file mode 100644
index 0000000..af33e21
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/config/log4j.properties
@@ -0,0 +1,30 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# Set root logger level to DEBUG and its only appender to R.
+log4j.rootLogger=ERROR, C
+
+# A1 is set to be a ConsoleAppender.
+log4j.appender.C=org.apache.log4j.ConsoleAppender
+log4j.appender.C.layout=org.apache.log4j.PatternLayout
+log4j.appender.C.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+log4j.logger.org.I0Itec=ERROR
+log4j.logger.org.apache=ERROR
+
+log4j.logger.org.apache.helix.metamanager=INFO

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java
new file mode 100644
index 0000000..9a83f02
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterAdmin.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager;
+
+/**
+ * Abstraction for instance config (container) injection into and removal from
+ * the managed cluster.
+ * 
+ */
+public interface ClusterAdmin {
+
+    /**
+     * Add instance configuration to managed cluster.
+     * 
+     * @param instanceId
+     * @param instanceTag
+     */
+    public void addInstance(String instanceId, String instanceTag);
+
+    /**
+     * Remove instance configuration from managed cluster.<br/>
+     * <b>INVARIANT:</b> idempotent
+     * 
+     * @param instanceId
+     */
+    public void removeInstance(String instanceId);
+
+    /**
+     * Trigger rebalance of any affected resource in the managed cluster.
+     */
+    public void rebalance();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java
new file mode 100644
index 0000000..6aca07a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerProvider.java
@@ -0,0 +1,32 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterContainerProvider {
+	/**
+	 * Create container of given type.
+	 * 
+	 * @param id
+	 *            unique user-defined container id
+	 * @param type
+	 *            container type
+	 * @throws Exception
+	 * @return connection string
+	 */
+	public void create(String id, String type) throws Exception;
+
+	/**
+	 * Destroy container.
+	 * 
+	 * @param id
+	 *            unique user-defined container id
+	 * @return connection string
+	 * @throws Exception
+	 */
+	public void destroy(String id) throws Exception;
+
+	/**
+	 * Stops all running processes and destroys containers. Best-effort for
+	 * cleanup.
+	 * 
+	 */
+	public void destroyAll();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java
new file mode 100644
index 0000000..e68c0ee
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterContainerStatusProvider.java
@@ -0,0 +1,7 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterContainerStatusProvider {
+	public boolean exists(String id);
+	public boolean isActive(String id);
+	public boolean isFailed(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java
new file mode 100644
index 0000000..d29e1c3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterInstanceInjector.java
@@ -0,0 +1,6 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterInstanceInjector {
+	public void addInstance(String connection);
+	public void removeInstance(String connection);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java
new file mode 100644
index 0000000..1812dc3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ClusterStatusProvider.java
@@ -0,0 +1,5 @@
+package org.apache.helix.metamanager;
+
+public interface ClusterStatusProvider {
+	public int getTargetContainerCount(String containerType) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java
new file mode 100644
index 0000000..596743b
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ConfigTool.java
@@ -0,0 +1,47 @@
+package org.apache.helix.metamanager;
+
+import org.apache.log4j.Logger;
+
+public class ConfigTool {
+	
+	static final Logger log = Logger.getLogger(ConfigTool.class);
+	
+    public static final String SHELL_CONTAINER_PATH        = "target/metamanager-pkg/bin/shell-container-process.sh";
+    public static final String SHELL_CONTAINER_PROPERTIES  = "container.properties";
+    public static final String SHELL_CONTAINER_MARKER      = "active";
+
+    public static final String YARN_MASTER_ARCHIVE_PATH    = "target/metamanager-assembly.tar.gz";
+    public static final String YARN_MASTER_PATH            = "master/metamanager/bin/yarn-master-process.sh";
+    public static final String YARN_MASTER_STAGING         = "master.tar.gz";
+    public static final String YARN_MASTER_DESTINATION     = "master";
+    public static final String YARN_MASTER_PROPERTIES      = "master.properties";
+    public static final String YARN_CONTAINER_ARCHIVE_PATH = "target/metamanager-assembly.tar.gz";
+    public static final String YARN_CONTAINER_STAGING      = "container.tar.gz";
+    public static final String YARN_CONTAINER_PATH         = "container/metamanager/bin/yarn-container-process.sh";
+    public static final String YARN_CONTAINER_DESTINATION  = "container";
+    public static final String YARN_CONTAINER_PROPERTIES   = "container.properties";
+
+    public static final long   CONTAINER_TIMEOUT           = 60000;
+
+	static TargetProvider targetProvider;
+	static StatusProvider statusProvider;
+	
+	private ConfigTool() {
+		// left blank
+	}
+	
+	public static TargetProvider getTargetProvider() {
+		return targetProvider;
+	}
+	public static void setTargetProvider(TargetProvider targetProvider) {
+		ConfigTool.targetProvider = targetProvider;
+	}
+	
+	public static StatusProvider getStatusProvider() {
+		return statusProvider;
+	}
+	public static void setStatusProvider(StatusProvider statusProvider) {
+		ConfigTool.statusProvider = statusProvider;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java
new file mode 100644
index 0000000..2483bba
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProvider.java
@@ -0,0 +1,40 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.metamanager.provider.ProviderStateModel;
+
+/**
+ * Abstraction for container deployment framework. Creates and destroys
+ * container instances. Is invoked by ProviderStateModel and must be blocking.
+ * 
+ * @see ProviderStateModel
+ */
+public interface ContainerProvider {
+    /**
+     * Create container of given type.<br/>
+     * <b>INVARIANT:</b> synchronous invocation
+     * 
+     * @param id
+     *            unique user-defined container id
+     * @param containerType
+     *            container type
+     * @throws Exception
+     */
+    public void create(String id, String containerType) throws Exception;
+
+    /**
+     * Destroy container.<br/>
+     * <b>INVARIANT:</b> synchronous invocation
+     * 
+     * @param id
+     *            unique user-defined container id
+     * @throws Exception
+     */
+    public void destroy(String id) throws Exception;
+
+    /**
+     * Stops all running processes and destroys containers. Best-effort for
+     * cleanup.
+     * 
+     */
+    public void destroyAll();
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java
new file mode 100644
index 0000000..a7da053
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerProviderService.java
@@ -0,0 +1,9 @@
+package org.apache.helix.metamanager;
+
+/**
+ * ContainerProvider as configurable service.
+ * 
+ */
+public interface ContainerProviderService extends ContainerProvider, Service {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java
new file mode 100644
index 0000000..d2853d9
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ContainerStatusProvider.java
@@ -0,0 +1,7 @@
+package org.apache.helix.metamanager;
+
+public interface ContainerStatusProvider {
+	public boolean exists(String id);
+	public boolean isActive(String id);
+	public boolean isFailed(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java
new file mode 100644
index 0000000..06e2251
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/FileStatusProvider.java
@@ -0,0 +1,27 @@
+package org.apache.helix.metamanager;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+
+public class FileStatusProvider implements ClusterStatusProvider {
+
+	final File file;
+	
+	public FileStatusProvider(File file) {
+		this.file = file;
+	}
+
+	@Override
+	public int getTargetContainerCount(String containerType) throws FileNotFoundException, IOException, IllegalArgumentException {
+		Properties properties = new Properties();
+		properties.load(new FileReader(file));
+		if(!properties.contains(containerType))
+			throw new IllegalArgumentException(String.format("container type '%s' not found in '%s'", containerType, file.getCanonicalPath()));
+		return Integer.parseInt((String)properties.get(containerType));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java
new file mode 100644
index 0000000..3dd2f48
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/HelixClusterAdmin.java
@@ -0,0 +1,43 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+/**
+ * Implementation of ClusterAdmin based on Helix.
+ * 
+ */
+public class HelixClusterAdmin implements ClusterAdmin {
+
+    static final Logger log = Logger.getLogger(HelixClusterAdmin.class);
+
+    final String        cluster;
+    final HelixAdmin    admin;
+
+    public HelixClusterAdmin(String clusterName, HelixAdmin admin) {
+        this.cluster = clusterName;
+        this.admin = admin;
+    }
+
+    @Override
+    public synchronized void addInstance(String instanceId, String instanceTag) {
+        log.debug(String.format("injecting instance %s (tag=%s) in cluster %s", instanceId, instanceTag, cluster));
+        admin.addInstance(cluster, new InstanceConfig(instanceId));
+        admin.addInstanceTag(cluster, instanceId, instanceTag);
+    }
+
+    @Override
+    public synchronized void removeInstance(String connection) {
+        log.debug(String.format("removing instance %s from cluster %s", connection, cluster));
+        admin.dropInstance(cluster, new InstanceConfig(connection));
+    }
+
+    @Override
+    public void rebalance() {
+        for (String resourceName : admin.getResourcesInCluster(cluster)) {
+            int replica = Integer.parseInt(admin.getResourceIdealState(cluster, resourceName).getReplicas());
+            admin.rebalance(cluster, resourceName, replica);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java
new file mode 100644
index 0000000..ab91ae7
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Manager.java
@@ -0,0 +1,129 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE" })
+public class Manager extends StateModel {
+	
+	static final Logger log = Logger.getLogger(Manager.class);
+	
+	ClusterContainerProvider provider;
+	ClusterAdmin admin;
+
+	public Manager(ClusterContainerProvider provider, ClusterAdmin admin) {
+		this.provider = provider;
+		this.admin = admin;
+	}
+
+	@Transition(from = "OFFLINE", to = "ONLINE")
+	public void acquire(Message m, NotificationContext context) throws Exception {
+		String containerType = m.getResourceName();
+		String containerId = m.getPartitionName();
+		String instanceId = context.getManager().getInstanceName();
+		
+		log.trace(String.format("%s:%s transitioning from OFFLINE to ONLINE",
+				containerId, instanceId));
+		
+		bestEffortRemove(containerId);
+		
+		// add instance to cluster
+		admin.addInstance(containerId);
+		
+		// create container
+		provider.create(containerId, containerType);
+
+		try {
+			admin.rebalance();
+		} catch (Exception e) {
+			// ignore
+			log.warn(String.format("rebalancing cluster failed (error='%s')", e.getMessage()));
+		}
+
+		log.info(String.format("%s acquired container '%s' (type='%s')",
+				instanceId, containerId, containerType));
+	}
+
+	@Transition(from = "ONLINE", to = "OFFLINE")
+	public void release(Message m, NotificationContext context) {
+		String containerId = m.getPartitionName();
+		String instanceId = context.getManager().getInstanceName();
+
+		log.trace(String.format("%s:%s transitioning from ONLINE to OFFLINE",
+				containerId, instanceId));
+		
+		bestEffortRemove(containerId);
+		
+		try {
+			admin.rebalance();
+		} catch (Exception e) {
+			// ignore
+			log.warn(String.format("rebalancing cluster failed (error='%s')", e.getMessage()));
+		}
+
+		log.info(String.format("%s destroyed container '%s'",
+				instanceId, containerId));
+
+	}
+
+	@Transition(from = "ERROR", to = "OFFLINE")
+	public void recover(Message m, NotificationContext context) {
+		String containerId = m.getPartitionName();
+		String instanceId = context.getManager().getInstanceName();
+
+		log.trace(String.format("%s:%s transitioning from ERROR to OFFLINE",
+				containerId, instanceId));
+	}
+	
+	@Transition(from = "OFFLINE", to = "DROPPED")
+	public void drop(Message m, NotificationContext context) {
+		String containerId = m.getPartitionName();
+		String instanceId = context.getManager().getInstanceName();
+
+		log.trace(String.format("%s:%s transitioning from OFFLINE to DROPPED",
+				containerId, instanceId));
+	}
+	
+	private void bestEffortRemove(String containerId) {
+		log.debug(String.format("Best effort removal of container '%s'", containerId));
+		
+		try {
+			provider.destroy(containerId);
+			log.debug(String.format("Container '%s' destroyed", containerId));
+		} catch (Exception e) {
+			log.debug(String.format("Container '%s' does not exist", containerId));
+		}
+		
+		try {
+			admin.removeInstance(containerId);
+			log.debug(String.format("Instance '%s' removed", containerId));
+		} catch (Exception e) {
+			log.debug(String.format("Instance '%s' does not exist", containerId));
+		}
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java
new file mode 100644
index 0000000..35891f0
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerDemo.java
@@ -0,0 +1,463 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.metamanager.managed.HelixClusterAdmin;
+import org.apache.helix.metamanager.managed.LocalStatusProvider;
+import org.apache.helix.metamanager.provider.local.LocalContainerProvider;
+import org.apache.helix.metamanager.provider.local.LocalContainerStatusProvider;
+import org.apache.helix.metamanager.provider.shell.ShellContainerProvider;
+import org.apache.helix.metamanager.provider.shell.ShellContainerStatusProvider;
+import org.apache.helix.metamanager.provider.yarn.ApplicationConfig;
+import org.apache.helix.metamanager.provider.yarn.YarnApplication;
+import org.apache.helix.metamanager.provider.yarn.YarnContainerProvider;
+import org.apache.helix.metamanager.provider.yarn.YarnContainerStatusProvider;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.IdealStateModeProperty;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+
+public class ManagerDemo
+{
+	static final long TIMESTEP_INTERVAL = 1000;
+	
+	static final String MANAGED_PROCESS_PATH = "target/meta-cluster-manager-pkg/bin/container-process.sh";
+	static final String YARN_PROCESS_PATH    = "/home/apucher/incubator-helix/recipes/meta-cluster-manager/target/meta-cluster-manager-pkg/bin/yarn-container-process.sh";
+
+	static final String PROVIDER_LOCAL = "LOCAL";
+	static final String PROVIDER_SHELL = "SHELL";
+	static final String PROVIDER_YARN  = "YARN";
+	
+	static final Logger log = Logger.getLogger(ManagerDemo.class);
+	
+	static final int zkPort = 2199; 
+	static final String zkAddress = "localhost:" + zkPort;
+	static final String metaClusterName = "meta-cluster";
+	static final String managedClusterName = "managed-cluster";
+	static final String metaResourceName = "container";
+	static final String managedResourceName = "database";
+	
+	static final int numContainerProviders = 3;
+	static final int numContainerMax = 7;
+	static final int numContainerMin = 3;
+	static final int numContainerStep = 2;
+	static final int numContainerReplica = 1;
+	
+	static final int numManagedPartitions = 10;
+	static final int numManagedReplica = 2;
+	
+	static List<ClusterContainerProvider> providers = new ArrayList<ClusterContainerProvider>();
+	static int providerCount = 0;
+	    
+	static Collection<YarnContainerProvider> yarnProviders = new ArrayList<YarnContainerProvider>();
+	static Collection<YarnContainerStatusProvider> yarnStatusProviders = new ArrayList<YarnContainerStatusProvider>();
+	static Collection<YarnApplication> yarnApplications = new ArrayList<YarnApplication>();
+	
+  /**
+   * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
+   * 
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception
+  {
+	  
+	String containerProviderType = PROVIDER_LOCAL;
+	if(args.length >= 1) {
+		containerProviderType = args[0];
+	}
+	  
+    LocalStatusProvider clusterStatusProvider = null;
+    ManagerProcess[] managerProcesses = new ManagerProcess[numContainerProviders];
+
+    HelixManager metaControllerManager = null;
+    HelixManager managedControllerManager = null;
+    
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+		@Override
+		public void run() {
+		      for(ClusterContainerProvider provider : providers) {
+			      log.info("Destroying all containers of provider");
+			      provider.destroyAll();
+		      }
+		      for(YarnContainerProvider provider : yarnProviders) {
+			      log.info("Stopping yarn container provider");
+			      provider.stopService();   
+		      }
+		      for(YarnContainerStatusProvider provider : yarnStatusProviders) {
+			      log.info("Stopping yarn container status provider");
+			      provider.stopService();   
+		      }
+			  for(YarnApplication application: yarnApplications) {
+				  log.info("Stopping yarn application");
+				  try { application.stop(); } catch(Exception ignore) {}
+		      }
+		}
+	}));
+
+    try
+    {
+      log.info("Starting ZooKeeper");
+      startLocalZookeeper();
+      HelixAdmin admin = new ZKHelixAdmin(zkAddress);
+
+      log.info("Create clusters");
+      admin.addCluster(metaClusterName, true);
+      admin.addCluster(managedClusterName, true);
+      
+      log.info("Create providers");
+      clusterStatusProvider = new LocalStatusProvider(numContainerMin);
+      ClusterContainerStatusProvider containerStatusProvider = createContainerStatusProvider(containerProviderType); 
+      
+      log.info("Setup config tool");
+      ConfigTool.setClusterStatusProvider(clusterStatusProvider);
+      ConfigTool.setContainerStatusProvider(containerStatusProvider);
+      
+      // Managed Cluster
+      log.info("Setup managed cluster");
+      admin.addStateModelDef(managedClusterName, "MasterSlave",
+          new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
+      admin.addResource(managedClusterName, managedResourceName, numManagedPartitions,
+          "MasterSlave", IdealStateModeProperty.AUTO_REBALANCE.toString());
+      admin.rebalance(managedClusterName, managedResourceName, numManagedReplica);
+      
+      // Meta Cluster
+      log.info("Setup meta cluster");
+      admin.addStateModelDef(metaClusterName, "OnlineOffline",
+          new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
+      admin.addResource(metaClusterName, metaResourceName, clusterStatusProvider.getTargetContainerCount(""),
+          "OnlineOffline", IdealStateModeProperty.AUTO_REBALANCE.toString());
+      
+      IdealState idealState = admin.getResourceIdealState(metaClusterName, metaResourceName);
+      idealState.setRebalancerClassName(ManagerRebalancer.class.getName());
+      //idealState.getRecord().setSimpleField(IdealStateProperty.REBALANCE_TIMER_PERIOD.toString(), "2000"); // Timer trigger creates race condition
+      admin.setResourceIdealState(metaClusterName, metaResourceName, idealState);	  
+      admin.rebalance(metaClusterName, metaResourceName, 1);
+      
+      log.info("Starting meta processes (container providers)");
+      for (int i = 0; i < numContainerProviders; i++)
+      {
+        String instanceName = "provider_" + i;
+        admin.addInstance(metaClusterName, new InstanceConfig(instanceName));
+        
+        ClusterAdmin clusterAdmin = new HelixClusterAdmin(managedClusterName, managedResourceName, numManagedReplica, admin);
+
+        managerProcesses[i] = new ManagerProcess(metaClusterName, zkAddress,
+                instanceName, createContainerProvider(containerProviderType), clusterAdmin);
+        managerProcesses[i].start();
+      }
+      
+      log.info("Starting managed cluster controller");
+      managedControllerManager = HelixControllerMain.startHelixController(zkAddress,
+          managedClusterName, "managedController", HelixControllerMain.STANDALONE);
+      log.info("Starting meta cluster controller");
+      metaControllerManager = HelixControllerMain.startHelixController(zkAddress,
+          metaClusterName, "metaController", HelixControllerMain.STANDALONE);
+          
+      waitUntilRebalancedCount(numContainerMin, admin);
+      printStep("Initial cluster state", admin);
+      
+      while(clusterStatusProvider.getTargetContainerCount("") < numContainerMax) {
+    	  int newCount = clusterStatusProvider.getTargetContainerCount("") + numContainerStep;
+    	  
+          log.info(String.format("Increasing container count to %d", newCount));
+	      clusterStatusProvider.setTargetContainerCount(newCount);
+	      
+	      triggerPipeline(admin);	      
+	      waitUntilRebalancedCount(newCount, admin);
+	      printStep(String.format("Increased container count to %d", newCount), admin);
+      }
+      
+      log.info("Destroying container 0 and container 1");
+	    int currentCount = clusterStatusProvider.getTargetContainerCount("");
+      providers.get(0).destroy("container_0");
+      providers.get(0).destroy("container_1");
+      triggerPipeline(admin);
+      waitUntilRebalancedCount(currentCount, admin);
+      printStep("Destroyed container 0 and container 1", admin);
+      
+      log.info("Destroying container provider 0");
+	  currentCount = clusterStatusProvider.getTargetContainerCount("");
+      managerProcesses[0].stop();
+      waitUntilRebalancedCount(currentCount, admin);
+      printStep("Destroyed container provider 0", admin);
+      
+      while(clusterStatusProvider.getTargetContainerCount("") > numContainerMin) {
+    	  int newCount = clusterStatusProvider.getTargetContainerCount("") - numContainerStep;
+    	  
+          log.info(String.format("Decreasing container count to %d", newCount));
+	      clusterStatusProvider.setTargetContainerCount(newCount);
+	      
+	      triggerPipeline(admin);
+	      waitUntilRebalancedCount(newCount, admin);
+	      printStep(String.format("Decreased container count to %d", clusterStatusProvider.getTargetContainerCount("")), admin);
+      }
+      
+      log.info("Stopping processes");
+      
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    } finally
+    {
+      if (managedControllerManager != null) {
+	      log.info("Disconnecting managed cluster controller");
+    	  managedControllerManager.disconnect();
+      }
+      if (metaControllerManager != null) {
+	      log.info("Disconnecting meta cluster controller");
+        metaControllerManager.disconnect();
+      }
+	  log.info("Destroying meta processes");
+      for (ManagerProcess process : managerProcesses) {
+    	  process.stop();
+      }
+    }
+    
+    // TODO clean up threads correctly
+    System.exit(0);
+  }
+
+private static void triggerPipeline(HelixAdmin admin) {
+	IdealState poke = admin.getResourceIdealState(metaClusterName, metaResourceName);
+	  admin.setResourceIdealState(metaClusterName, metaResourceName, poke);
+}
+  
+  private static void printStep(String text, HelixAdmin admin) throws Exception {
+	  log.info("********************************************************************************");
+      log.info(text);
+      log.info("********************************************************************************");
+      printClusterStatus(admin);
+      
+      System.out.println("Press ENTER to continue");
+      System.in.read();
+  }
+  
+  static void printClusterStatus(HelixAdmin admin) throws Exception {
+      log.info("Managed cluster status");
+      printStatusMasterSlave(admin);
+      log.info("Meta cluster status");
+      printMetaClusterStatus(admin);
+  }
+  
+  static void waitUntilRebalancedCount(int containerCount, HelixAdmin admin) throws InterruptedException {
+	  Thread.sleep(TIMESTEP_INTERVAL);
+	  while(containerCount != getMetaContainerCount(admin) ||
+			containerCount != getManagedContainerCount(admin)) {
+		  Thread.sleep(TIMESTEP_INTERVAL);
+  	  }
+	  ClusterStateVerifier.verifyByPolling(new BestPossAndExtViewZkVerifier(zkAddress, managedClusterName));
+  }
+
+  static int getMetaContainerCount(HelixAdmin admin) {
+	    Set<String> assignedInstances = new HashSet<String>();
+		  
+	    ExternalView externalView = admin.getResourceExternalView(metaClusterName, metaResourceName);
+		  
+	    for (String partitionName : externalView.getPartitionSet())
+	    {
+	      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+	      if(stateMap == null)
+	  	    continue;
+	    
+	      for(String instanceName : stateMap.keySet()){
+	        if ("ONLINE".equals(stateMap.get(instanceName))) {
+	          assignedInstances.add(partitionName);
+	          break;
+	        }
+	      }
+	    }
+	  
+	    return assignedInstances.size();
+	  }
+
+  static int getManagedContainerCount(HelixAdmin admin) {
+    Set<String> assignedInstances = new HashSet<String>();
+	  
+    ExternalView externalView = admin.getResourceExternalView(managedClusterName, managedResourceName);
+	  
+    for (String partitionName : externalView.getPartitionSet())
+    {
+      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+      if(stateMap == null)
+  	    continue;
+    
+      for(String instanceName : stateMap.keySet()){
+        if ("MASTER".equals(stateMap.get(instanceName)) ||
+      	    "SLAVE".equals(stateMap.get(instanceName))) {
+          assignedInstances.add(instanceName);
+        }
+      }
+    }
+  
+    return assignedInstances.size();
+  }
+
+  static void printMetaClusterStatus(HelixAdmin admin)
+  {
+    ExternalView externalView = admin
+        .getResourceExternalView(metaClusterName, metaResourceName);
+    TreeSet<String> treeSet = new TreeSet<String>(
+        externalView.getPartitionSet());
+    log.info("container" + "\t" + "acquired by");
+    log.info("======================================");
+    for (String partitionName : treeSet)
+    {
+      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+      String acquiredBy = null;
+      if (stateMap != null)
+      {
+        for(String instanceName:stateMap.keySet()){
+          if ("ONLINE".equals(stateMap.get(instanceName))){
+            acquiredBy = instanceName;
+            break;
+          }
+        }
+      }
+      log.info(partitionName + "\t"
+          + ((acquiredBy != null) ? acquiredBy : "NONE"));
+    }
+  }
+
+  static void printStatusMasterSlave(HelixAdmin admin)
+	  {
+	    ExternalView externalView = admin
+	        .getResourceExternalView(managedClusterName, managedResourceName);
+	    TreeSet<String> treeSet = new TreeSet<String>(
+	        externalView.getPartitionSet());
+	    log.info("partition" + "\t" + "master" + "\t\t" + "slave");
+	    log.info("============================================================");
+	    for (String partitionName : treeSet)
+	    {
+	      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+	      String master = "NONE";
+	      String slave = "NONE";
+	      if (stateMap != null)
+	      {
+	        for(String instanceName:stateMap.keySet()){
+	          if ("MASTER".equals(stateMap.get(instanceName))){
+	        	  master = instanceName;
+	          }
+	          if ("SLAVE".equals(stateMap.get(instanceName))){
+	        	  slave = instanceName;
+	          }
+	        }
+	      }
+	      log.info(String.format("%s\t%s\t%s", partitionName, master, slave));
+	    }
+	  }
+
+  public static void startLocalZookeeper() throws Exception
+  {
+    ZkServer server = null;
+    String baseDir = "/tmp/IntegrationTest/";
+    final String dataDir = baseDir + "zk/dataDir";
+    final String logDir = baseDir + "/tmp/logDir";
+    FileUtils.deleteDirectory(new File(dataDir));
+    FileUtils.deleteDirectory(new File(logDir));
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+    {
+      @Override
+      public void createDefaultNameSpace(ZkClient zkClient)
+      {
+
+      }
+    };
+    server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+    server.start();
+
+  }
+  
+  private static ClusterContainerProvider createContainerProvider(String type) throws Exception {
+      String providerName = "provider_" + providerCount;
+      providerCount++;
+      
+	  if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+		  log.info("Using VM-local container provider");
+		  LocalContainerProvider provider =  new LocalContainerProvider(zkAddress, managedClusterName, providerName);
+		  providers.add(provider);
+		  return provider;
+	  } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+		  log.info("Using shell-based container provider");
+		  ShellContainerProvider provider = new ShellContainerProvider(zkAddress, managedClusterName, providerName, MANAGED_PROCESS_PATH);
+		  providers.add(provider);
+		  return provider;
+	  } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+	      ApplicationConfig appConfig = new ApplicationConfig(zkAddress, managedClusterName, zkAddress, providerName);
+		  
+		  log.info("Using yarn-based container provider");
+		  YarnApplication yarnApplication = new YarnApplication(appConfig);
+		  yarnApplication.start();
+		  yarnApplications.add(yarnApplication);
+		  
+		  YarnContainerProvider yarnProvider = new YarnContainerProvider(appConfig, YARN_PROCESS_PATH);
+		  yarnProvider.startService();
+		  yarnProviders.add(yarnProvider);
+		  
+		  providers.add(yarnProvider);
+		  return yarnProvider;
+	  } else {
+		  throw new IllegalArgumentException(String.format("Unknown container provider type '%s'", type));
+	  }
+  }
+  
+  private static ClusterContainerStatusProvider createContainerStatusProvider(String type) throws Exception {
+	  if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+		  log.info("Using VM-local container status provider");
+		  LocalContainerStatusProvider provider = new LocalContainerStatusProvider();
+		  return provider;
+	  } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+		  log.info("Using shell-based container status provider");
+		  ShellContainerStatusProvider provider = new ShellContainerStatusProvider();
+		  return provider;
+	  } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+		  log.info("Using yarn-based container status provider");
+		  YarnContainerStatusProvider provider = new YarnContainerStatusProvider(zkAddress);
+		  provider.startService();
+		  yarnStatusProviders.add(provider);
+		  return provider;
+	  } else {
+		  throw new IllegalArgumentException(String.format("Unknown container status provider type '%s'", type));
+	  }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java
new file mode 100644
index 0000000..44a924e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerFactory.java
@@ -0,0 +1,39 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ManagerFactory extends StateModelFactory<Manager> {
+
+	final ClusterContainerProvider provider;
+	final ClusterAdmin admin;
+	
+	public ManagerFactory(ClusterContainerProvider provider, ClusterAdmin admin) {
+		super();
+		this.provider = provider;
+		this.admin = admin;
+	}
+
+	@Override
+	public Manager createNewStateModel(String partitionName) {
+		return new Manager(provider, admin);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java
new file mode 100644
index 0000000..7812e6f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerProcess.java
@@ -0,0 +1,67 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+public class ManagerProcess
+{
+	static final Logger log = Logger.getLogger(ManagerProcess.class);
+	
+  final String clusterName;
+  final String zkAddress;
+  final String instanceName;
+  final ClusterContainerProvider provider;
+  final ClusterAdmin admin;
+  
+  HelixManager participantManager;
+  
+  ManagerProcess(String clusterName, String zkAddress, String instanceName, ClusterContainerProvider provider, ClusterAdmin admin)
+  {
+    this.clusterName = clusterName;
+    this.zkAddress = zkAddress;
+    this.instanceName = instanceName;
+    this.provider = provider;
+    this.admin = admin;
+  }
+
+  public void start() throws Exception
+  {
+    log.info("STARTING "+ instanceName);
+    participantManager = HelixManagerFactory.getZKHelixManager(clusterName,
+        instanceName, InstanceType.PARTICIPANT, zkAddress);
+    participantManager.getStateMachineEngine().registerStateModelFactory(
+        "OnlineOffline", new ManagerFactory(provider, admin));
+    participantManager.connect();
+    log.info("STARTED "+ instanceName);
+
+  }
+
+  public void stop()
+  {
+    if (participantManager != null)
+    {
+      participantManager.disconnect();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java
new file mode 100644
index 0000000..2b2824c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ManagerRebalancer.java
@@ -0,0 +1,167 @@
+package org.apache.helix.metamanager;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.log4j.Logger;
+
+/**
+ * Rebalancer for cluster state. Uses cluster status provider.<br/>
+ * <br/>
+ * IdealState mapping:<br/>
+ * resource     = tag-name<br/>
+ *   partition  = logical container<br/>
+ *     instance = resource provider<br/>
+ *       status = physical container presence
+ *
+ */
+public class ManagerRebalancer implements Rebalancer {
+	
+	static final Logger log = Logger.getLogger(ManagerRebalancer.class);
+	
+	static final long UPDATE_INTERVAL_MIN = 1500;
+	
+	static final Object lock = new Object();
+	static long nextUpdate = 0; 
+
+	ClusterStatusProvider clusterStatusProvider;
+	ClusterContainerStatusProvider containerStatusProvider;
+	HelixManager manager;
+	
+	@Override
+	public void init(HelixManager manager) {
+		this.clusterStatusProvider = ConfigTool.getClusterStatusProvider();
+		this.containerStatusProvider = ConfigTool.getContainerStatusProvider();
+		this.manager = manager;
+	}
+
+	@Override
+	public IdealState computeNewIdealState(String resourceName,
+			IdealState currentIdealState,
+			CurrentStateOutput currentStateOutput, ClusterDataCache clusterData) {
+	
+//		synchronized(lock) {
+//			if(nextUpdate > System.currentTimeMillis()) {
+//				return currentIdealState;
+//			}
+//			nextUpdate = System.currentTimeMillis() + UPDATE_INTERVAL_MIN;
+		
+			// target container count
+			int targetCount = clusterStatusProvider.getTargetContainerCount(resourceName);
+			
+			// currently active containers
+			List<String> currentPartitions = new ArrayList<String>();
+			for(String partitionName : currentIdealState.getPartitionSet()) {
+				Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, new Partition(partitionName));
+				Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName, new Partition(partitionName));
+				
+				if(hasOnlineInstance(currentStateMap) ||
+				   hasOnlineInstance(pendingStateMap)) {
+					currentPartitions.add(partitionName);
+				}
+			}
+			int currentCount = currentPartitions.size();
+			
+			// currently failed containers
+			List<String> failedPartitions = new ArrayList<String>();
+			for(String partitionName : currentIdealState.getPartitionSet()) {
+				Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, new Partition(partitionName));
+				
+				if(!hasOnlineInstance(currentStateMap))
+					continue;
+				
+				// container listed online, but does not exist
+				if(!containerStatusProvider.exists(partitionName)) {
+					log.warn(String.format("Container '%s' designated ONLINE, but does not exist", partitionName));
+					failedPartitions.add(partitionName);
+				}
+				
+				// container listed online and exists, but in failure state
+				if(containerStatusProvider.exists(partitionName) &&
+				   containerStatusProvider.isFailed(partitionName)) {
+					log.warn(String.format("Container '%s' designated ONLINE, but in failure state", partitionName));
+					failedPartitions.add(partitionName);
+				}
+			}
+			int failureCount = failedPartitions.size();
+			
+			if(currentCount != targetCount ||
+			   failureCount != 0) {
+				log.info(String.format("Rebalancing containers (current=%d, target=%d, failures=%d)", currentCount, targetCount, failureCount));
+				
+				currentIdealState.setNumPartitions(targetCount);
+				
+				// future active containers
+				log.debug("active containers");
+				List<String> activePartitions = new ArrayList<String>();
+				for(int i=0; i<targetCount; i++) {
+					String partitionName = resourceName + "_" + i;
+					activePartitions.add(partitionName);
+				}
+				activePartitions.removeAll(failedPartitions);
+				
+				// future passive containers
+				log.debug("passive containers");
+				List<String> passivePartitions = new ArrayList<String>();
+				for(int i=targetCount; i<currentCount; i++) {
+					String partitionName = resourceName + "_" + i;
+					passivePartitions.add(partitionName);
+				}
+				passivePartitions.addAll(failedPartitions);
+				
+				log.debug("output");
+				if(log.isDebugEnabled()) {
+					log.debug(String.format("%s: failed partitions %s", resourceName, failedPartitions));
+					log.debug(String.format("%s: active partitions %s", resourceName, activePartitions));
+					log.debug(String.format("%s: passive partitions %s", resourceName, passivePartitions));
+				}
+				
+				log.debug("building ideal state");
+				Map<String, List<String>> listFields = new HashMap<String, List<String>>();
+				Map<String, Map<String, String>> mapFields = new HashMap<String, Map<String, String>>();
+				for(String partitionName : activePartitions) {
+					listFields.put(partitionName, new ArrayList<String>());
+					mapFields.put(partitionName, new HashMap<String, String>());
+				}
+				currentIdealState.getRecord().setListFields(listFields);
+				currentIdealState.getRecord().setMapFields(mapFields);
+				
+				log.debug("setting ideal state");
+				String clusterName = manager.getClusterName();
+				manager.getClusterManagmentTool().setResourceIdealState(clusterName, resourceName, currentIdealState);
+				
+				log.debug("enable partitions");
+				for(String instanceName : clusterData.getInstanceConfigMap().keySet()) {
+					log.debug(String.format("enable partitions for '%s'", instanceName));
+					manager.getClusterManagmentTool().enablePartition(true, clusterName, instanceName, resourceName, activePartitions);
+					log.debug(String.format("disable partitions for '%s'", instanceName));
+					manager.getClusterManagmentTool().enablePartition(false, clusterName, instanceName, resourceName, passivePartitions);
+				}
+				
+				log.debug("done");
+			}
+			
+			return currentIdealState;
+//		}
+	}
+
+	private boolean hasOnlineInstance(Map<String, String> stateMap) {
+		if(!stateMap.isEmpty()) {
+			for(Map.Entry<String, String> entry : stateMap.entrySet()) {
+				if(entry.getValue().equals("ONLINE")) {
+					return true;
+				}
+			}
+		}
+		return false;
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java
new file mode 100644
index 0000000..d0be313
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/MetaManagerDemo.java
@@ -0,0 +1,457 @@
+package org.apache.helix.metamanager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.metamanager.container.ContainerUtils;
+import org.apache.helix.metamanager.impl.StaticTargetProvider;
+import org.apache.helix.metamanager.impl.local.LocalContainerProvider;
+import org.apache.helix.metamanager.impl.local.LocalStatusProvider;
+import org.apache.helix.metamanager.impl.shell.ShellContainerProvider;
+import org.apache.helix.metamanager.impl.shell.ShellStatusProvider;
+import org.apache.helix.metamanager.impl.yarn.YarnApplicationProperties;
+import org.apache.helix.metamanager.impl.yarn.YarnContainerProvider;
+import org.apache.helix.metamanager.impl.yarn.YarnStatusProvider;
+import org.apache.helix.metamanager.provider.ProviderProcess;
+import org.apache.helix.metamanager.provider.ProviderRebalancer;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.log4j.Logger;
+
+public class MetaManagerDemo
+{
+	static final long TIMESTEP_INTERVAL = 1000;
+	
+	static final String PROVIDER_LOCAL = "LOCAL";
+	static final String PROVIDER_SHELL = "SHELL";
+	static final String PROVIDER_YARN  = "YARN";
+	
+	static final Logger log = Logger.getLogger(MetaManagerDemo.class);
+	
+	static final int zkPort = 2199;
+	static final String zkAddress = "localhost:" + zkPort;
+	static final String metaClusterName = "meta-cluster";
+	static final String managedClusterName = "managed-cluster";
+	static final String metaResourceName = "container";
+	static final String managedResourceName = "database";
+	
+	static final int numContainerProviders = 3;
+	static final int numContainerMax = 7;
+	static final int numContainerMin = 3;
+	static final int numContainerStep = 2;
+	static final int numContainerReplica = 1;
+	
+	static final int numManagedPartitions = 10;
+	static final int numManagedReplica = 2;
+	
+	static List<ContainerProvider> providers = new ArrayList<ContainerProvider>();
+	static int providerCount = 0;
+	
+	static Collection<Service> services = new ArrayList<Service>();
+	
+  /**
+   * LockManagerDemo clusterName, numInstances, lockGroupName, numLocks
+   * 
+   * @param args
+   * @throws Exception
+   */
+  public static void main(String[] args) throws Exception
+  {
+	  
+	String containerProviderType = PROVIDER_LOCAL;
+	if(args.length >= 1) {
+		containerProviderType = args[0];
+	}
+	  
+    StaticTargetProvider targetProvider = null;
+    ProviderProcess[] managerProcesses = new ProviderProcess[numContainerProviders];
+
+    HelixManager metaControllerManager = null;
+    HelixManager managedControllerManager = null;
+    
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                log.info("Destroying containers");
+                for (ContainerProvider provider : providers) {
+                    provider.destroyAll();
+                }
+                log.info("Stopping services");
+                for (Service service : services) {
+                    try { service.stop(); } catch (Exception ignore) {}
+                }
+            }
+        }));
+
+    try
+    {
+      log.info("Starting ZooKeeper");
+      startLocalZookeeper();
+      HelixAdmin admin = new ZKHelixAdmin(zkAddress);
+
+      log.info("Create clusters");
+      admin.addCluster(metaClusterName, true);
+      admin.addCluster(managedClusterName, true);
+      
+      log.info("Create providers");
+      targetProvider = startService(new StaticTargetProvider(Collections.singletonMap(metaResourceName, numContainerMin)));
+      StatusProvider statusProvider = startService(createContainerStatusProvider(containerProviderType));
+      
+      log.info("Setup config tool");
+      ConfigTool.setClusterStatusProvider(targetProvider);
+      ConfigTool.setContainerStatusProvider(statusProvider);
+      
+      // Managed Cluster
+      log.info("Setup managed cluster");
+      admin.addStateModelDef(managedClusterName, "MasterSlave",
+          new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()));
+      admin.addResource(managedClusterName, managedResourceName, numManagedPartitions,
+          "MasterSlave", RebalanceMode.FULL_AUTO.toString());
+      
+      IdealState managedIdealState = admin.getResourceIdealState(managedClusterName, managedResourceName);
+      managedIdealState.setInstanceGroupTag(metaResourceName);
+      managedIdealState.setReplicas(String.valueOf(numManagedReplica));
+      admin.setResourceIdealState(managedClusterName, managedResourceName, managedIdealState);	  
+      
+      // Meta Cluster
+      log.info("Setup meta cluster");
+      admin.addStateModelDef(metaClusterName, "OnlineOffline",
+          new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()));
+      admin.addResource(metaClusterName, metaResourceName, targetProvider.getTargetContainerCount(metaResourceName),
+          "OnlineOffline", RebalanceMode.USER_DEFINED.toString());
+      
+      IdealState metaIdealState = admin.getResourceIdealState(metaClusterName, metaResourceName);
+      metaIdealState.setRebalancerClassName(ProviderRebalancer.class.getName());
+      metaIdealState.setReplicas("1");
+      admin.setResourceIdealState(metaClusterName, metaResourceName, metaIdealState);	  
+      
+      log.info("Starting meta processes (container providers)");
+      for (int i = 0; i < numContainerProviders; i++)
+      {
+        String instanceName = "provider_" + i;
+        admin.addInstance(metaClusterName, new InstanceConfig(instanceName));
+        
+        ClusterAdmin clusterAdmin = new HelixClusterAdmin(managedClusterName, admin);
+
+        managerProcesses[i] = new ProviderProcess(metaClusterName, zkAddress,
+                instanceName, startService(createContainerProvider(containerProviderType)), clusterAdmin);
+        managerProcesses[i].start();
+      }
+      
+      log.info("Starting managed cluster controller");
+      managedControllerManager = HelixControllerMain.startHelixController(zkAddress,
+          managedClusterName, "managedController", HelixControllerMain.STANDALONE);
+      log.info("Starting meta cluster controller");
+      metaControllerManager = HelixControllerMain.startHelixController(zkAddress,
+          metaClusterName, "metaController", HelixControllerMain.STANDALONE);
+          
+      waitUntilRebalancedCount(numContainerMin, admin);
+      printStep("Initial cluster state", admin);
+      
+      while(targetProvider.getTargetContainerCount(metaResourceName) < numContainerMax) {
+    	  int newCount = targetProvider.getTargetContainerCount(metaResourceName) + numContainerStep;
+    	  
+          log.info(String.format("Increasing container count to %d", newCount));
+	      targetProvider.setTargetContainerCount(metaResourceName, newCount);
+	      
+	      triggerPipeline(admin);	      
+	      waitUntilRebalancedCount(newCount, admin);
+	      printStep(String.format("Increased container count to %d", newCount), admin);
+      }
+      
+      log.info("Destroying container 0 and container 1");
+	  int currentCount = targetProvider.getTargetContainerCount(metaResourceName);
+      providers.get(0).destroy("container_0");
+      providers.get(0).destroy("container_1");
+      triggerPipeline(admin);
+      waitUntilRebalancedCount(currentCount, admin);
+      printStep("Destroyed container 0 and container 1", admin);
+      
+      log.info("Destroying container provider 0");
+	  currentCount = targetProvider.getTargetContainerCount(metaResourceName);
+      managerProcesses[0].stop();
+      waitUntilRebalancedCount(currentCount, admin);
+      printStep("Destroyed container provider 0", admin);
+      
+      while(targetProvider.getTargetContainerCount(metaResourceName) > numContainerMin) {
+    	  int newCount = targetProvider.getTargetContainerCount(metaResourceName) - numContainerStep;
+    	  
+          log.info(String.format("Decreasing container count to %d", newCount));
+	      targetProvider.setTargetContainerCount(metaResourceName, newCount);
+	      
+	      triggerPipeline(admin);
+	      waitUntilRebalancedCount(newCount, admin);
+	      printStep(String.format("Decreased container count to %d", targetProvider.getTargetContainerCount(metaResourceName)), admin);
+      }
+      
+      log.info("Stopping processes");
+      
+    } catch (Exception e)
+    {
+      e.printStackTrace();
+    } finally
+    {
+      if (managedControllerManager != null) {
+	      log.info("Disconnecting managed cluster controller");
+    	  managedControllerManager.disconnect();
+      }
+      if (metaControllerManager != null) {
+	      log.info("Disconnecting meta cluster controller");
+        metaControllerManager.disconnect();
+      }
+	  log.info("Destroying meta processes");
+      for (ProviderProcess process : managerProcesses) {
+    	  process.stop();
+      }
+    }
+    
+    // TODO clean up threads correctly
+    System.exit(0);
+  }
+
+private static void triggerPipeline(HelixAdmin admin) {
+	IdealState poke = admin.getResourceIdealState(metaClusterName, metaResourceName);
+	  admin.setResourceIdealState(metaClusterName, metaResourceName, poke);
+}
+  
+  private static void printStep(String text, HelixAdmin admin) throws Exception {
+	  log.info("********************************************************************************");
+      log.info(text);
+      log.info("********************************************************************************");
+      printClusterStatus(admin);
+      
+      System.out.println("Press ENTER to continue");
+      System.in.read();
+  }
+  
+  static void printClusterStatus(HelixAdmin admin) throws Exception {
+      log.info("Managed cluster status");
+      printStatusMasterSlave(admin);
+      log.info("Meta cluster status");
+      printMetaClusterStatus(admin);
+  }
+  
+  static void waitUntilRebalancedCount(int containerCount, HelixAdmin admin) throws InterruptedException {
+	  Thread.sleep(TIMESTEP_INTERVAL);
+	  while(containerCount != getMetaContainerCount(admin) ||
+			containerCount != getManagedContainerCount(admin)) {
+		  Thread.sleep(TIMESTEP_INTERVAL);
+  	  }
+	  ClusterStateVerifier.verifyByPolling(new BestPossAndExtViewZkVerifier(zkAddress, managedClusterName));
+  }
+
+  static int getMetaContainerCount(HelixAdmin admin) {
+	    Set<String> assignedInstances = new HashSet<String>();
+		  
+	    ExternalView externalView = admin.getResourceExternalView(metaClusterName, metaResourceName);
+		  
+	    for (String partitionName : externalView.getPartitionSet())
+	    {
+	      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+	      if(stateMap == null)
+	  	    continue;
+	    
+	      for(String instanceName : stateMap.keySet()){
+	        if ("ONLINE".equals(stateMap.get(instanceName))) {
+	          assignedInstances.add(partitionName);
+	          break;
+	        }
+	      }
+	    }
+	  
+	    return assignedInstances.size();
+	  }
+
+  static int getManagedContainerCount(HelixAdmin admin) {
+    Set<String> assignedInstances = new HashSet<String>();
+	  
+    ExternalView externalView = admin.getResourceExternalView(managedClusterName, managedResourceName);
+	  
+    for (String partitionName : externalView.getPartitionSet())
+    {
+      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+      if(stateMap == null)
+  	    continue;
+    
+      for(String instanceName : stateMap.keySet()){
+        if ("MASTER".equals(stateMap.get(instanceName)) ||
+      	    "SLAVE".equals(stateMap.get(instanceName))) {
+          assignedInstances.add(instanceName);
+        }
+      }
+    }
+  
+    return assignedInstances.size();
+  }
+
+  static void printMetaClusterStatus(HelixAdmin admin)
+  {
+    ExternalView externalView = admin
+        .getResourceExternalView(metaClusterName, metaResourceName);
+    TreeSet<String> treeSet = new TreeSet<String>(
+        externalView.getPartitionSet());
+    log.info("container" + "\t" + "acquired by");
+    log.info("======================================");
+    for (String partitionName : treeSet)
+    {
+      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+      String acquiredBy = null;
+      if (stateMap != null)
+      {
+        for(String instanceName:stateMap.keySet()){
+          if ("ONLINE".equals(stateMap.get(instanceName))){
+            acquiredBy = instanceName;
+            break;
+          }
+        }
+      }
+      log.info(partitionName + "\t"
+          + ((acquiredBy != null) ? acquiredBy : "NONE"));
+    }
+  }
+
+  static void printStatusMasterSlave(HelixAdmin admin)
+	  {
+	    ExternalView externalView = admin
+	        .getResourceExternalView(managedClusterName, managedResourceName);
+	    TreeSet<String> treeSet = new TreeSet<String>(
+	        externalView.getPartitionSet());
+	    log.info("partition" + "\t" + "master" + "\t\t" + "slave");
+	    log.info("============================================================");
+	    for (String partitionName : treeSet)
+	    {
+	      Map<String, String> stateMap = externalView.getStateMap(partitionName);
+	      String master = "NONE";
+	      String slave = "NONE";
+	      if (stateMap != null)
+	      {
+	        for(String instanceName:stateMap.keySet()){
+	          if ("MASTER".equals(stateMap.get(instanceName))){
+	        	  master = instanceName;
+	          }
+	          if ("SLAVE".equals(stateMap.get(instanceName))){
+	        	  slave = instanceName;
+	          }
+	        }
+	      }
+	      log.info(String.format("%s\t%s\t%s", partitionName, master, slave));
+	    }
+	  }
+
+  public static void startLocalZookeeper() throws Exception
+  {
+    ZkServer server = null;
+	String baseDir = "/tmp/metamanager/";
+	final String dataDir = baseDir + "zk/dataDir";
+	final String logDir = baseDir + "zk/logDir";
+    FileUtils.deleteDirectory(new File(dataDir));
+    FileUtils.deleteDirectory(new File(logDir));
+
+    IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace()
+    {
+      @Override
+      public void createDefaultNameSpace(ZkClient zkClient)
+      {
+
+      }
+    };
+    server = new ZkServer(dataDir, logDir, defaultNameSpace, zkPort);
+    server.start();
+
+  }
+  
+  private static ContainerProviderService createContainerProvider(String type) throws Exception {
+      String providerName = "provider_" + providerCount;
+      providerCount++;
+      
+	  if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+		  log.info("Using VM-local container provider");
+		  LocalContainerProvider provider =  new LocalContainerProvider(zkAddress, managedClusterName, providerName);
+		  provider.registerType("container", ContainerUtils.getPropertiesFromResource("container.properties"));
+		  providers.add(provider);
+		  return provider;
+	  } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+		  log.info("Using shell-based container provider");
+		  ShellContainerProvider provider = new ShellContainerProvider(zkAddress, managedClusterName, providerName);
+          provider.registerType("container", ContainerUtils.getPropertiesFromResource("container.properties"));
+		  providers.add(provider);
+		  return provider;
+	  } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+	      YarnApplicationProperties properties = new YarnApplicationProperties();
+	      properties.put(YarnApplicationProperties.HELIX_CLUSTER, managedClusterName);
+	      properties.put(YarnApplicationProperties.HELIX_ZOOKEEPER, zkAddress);
+	      properties.put(YarnApplicationProperties.PROVIDER_METADATA, zkAddress);
+	      properties.put(YarnApplicationProperties.PROVIDER_NAME, providerName);
+		  
+		  log.info("Using yarn-based container provider");
+		  YarnContainerProvider yarnProvider = new YarnContainerProvider(properties);
+          yarnProvider.registerType("container", ContainerUtils.getPropertiesFromResource("container.properties"));
+		  
+		  providers.add(yarnProvider);
+		  return yarnProvider;
+	  } else {
+		  throw new IllegalArgumentException(String.format("Unknown container provider type '%s'", type));
+	  }
+  }
+  
+  private static StatusProviderService createContainerStatusProvider(String type) throws Exception {
+	  if(PROVIDER_LOCAL.equalsIgnoreCase(type)) {
+		  log.info("Using VM-local container status provider");
+		  return new LocalStatusProvider();
+	  } else if (PROVIDER_SHELL.equalsIgnoreCase(type)) {
+		  log.info("Using shell-based container status provider");
+		  return new ShellStatusProvider();
+	  } else if (PROVIDER_YARN.equalsIgnoreCase(type)) {
+		  log.info("Using yarn-based container status provider");
+		  return new YarnStatusProvider(zkAddress);
+	  } else {
+		  throw new IllegalArgumentException(String.format("Unknown container status provider type '%s'", type));
+	  }
+  }
+  
+  private static <T extends Service> T startService(T service) throws Exception {
+      service.start();
+      services.add(service);
+      return service;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java
new file mode 100644
index 0000000..c13a62e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/Service.java
@@ -0,0 +1,38 @@
+package org.apache.helix.metamanager;
+
+import java.util.Properties;
+
+/**
+ * Abstraction for configurable and runnable service. Light-weight dependency
+ * injection and life-cycle management.
+ * 
+ */
+public interface Service {
+
+    /**
+     * Configure service internals<br/>
+     * <b>INVARIANT:</b> executed only once
+     * 
+     * @param properties
+     *            arbitrary key-value properties, parsed internally
+     * @throws Exception
+     */
+    void configure(Properties properties) throws Exception;
+
+    /**
+     * Start service.<br/>
+     * <b>PRECONDITION:</b> configure() was invoked<br/>
+     * <b>INVARIANT:</b> executed only once
+     * 
+     * @throws Exception
+     */
+    void start() throws Exception;
+
+    /**
+     * Stop service.<br/>
+     * <b>INVARIANT:</b> idempotent
+     * 
+     * @throws Exception
+     */
+    void stop() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java
new file mode 100644
index 0000000..249b9b8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StaticStatusProvider.java
@@ -0,0 +1,28 @@
+package org.apache.helix.metamanager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class StaticStatusProvider implements ClusterStatusProvider {
+
+	final Map<String, Integer> targetCounts = new HashMap<String, Integer>();
+	
+	public StaticStatusProvider() {
+	    // left blank
+	}
+	
+	public StaticStatusProvider(Map<String, Integer> targetCounts) {
+		this.targetCounts.putAll(targetCounts);
+	}
+	
+	@Override
+	public int getTargetContainerCount(String containerType) {
+		return targetCounts.get(containerType);
+	}
+
+	public void setTargetContainerCount(String containerType, int targetCount) {
+		targetCounts.put(containerType, targetCount);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java
new file mode 100644
index 0000000..841f08d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProvider.java
@@ -0,0 +1,35 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.metamanager.provider.ProviderRebalancer;
+
+/**
+ * Abstraction for status reader of container deployment framework. Provides
+ * information on physical existence of container and activity or failure state.
+ * Is polled by ProviderRebalancer and should be light-weight and non-blocking.<br/>
+ * <b>NOTE:</b> This information is solely based on the low-level framework and
+ * may be different from the participant state in Helix. (The Helix participant
+ * may not even exist)
+ * 
+ * @see ProviderRebalancer
+ */
+public interface StatusProvider {
+
+    /**
+     * Determine whether container physically exists.
+     * 
+     * @param id
+     *            unique container id
+     * @return true, if container is present
+     */
+    public boolean exists(String id);
+
+    /**
+     * Determine whether container is healthy as determined by the deployment
+     * framework.
+     * 
+     * @param id
+     *            unique container id
+     * @return true, if container is healthy
+     */
+    public boolean isHealthy(String id);
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java
new file mode 100644
index 0000000..3c2739d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/StatusProviderService.java
@@ -0,0 +1,9 @@
+package org.apache.helix.metamanager;
+
+/**
+ * StatusProvider as configurable service.
+ * 
+ */
+public interface StatusProviderService extends StatusProvider, Service {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java
new file mode 100644
index 0000000..22524c4
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProvider.java
@@ -0,0 +1,25 @@
+package org.apache.helix.metamanager;
+
+import org.apache.helix.metamanager.provider.ProviderRebalancer;
+
+/**
+ * Abstraction for target computation and statistics collection. Provides target
+ * count of containers for ProviderRebalancer. Is polled by ProviderRebalancer
+ * and should be light-weight and non-blocking.<br/>
+ * <b>NOTE:</b> The target count is oblivious of failed containers and can be
+ * obtained in an arbitrary way. See implementations for examples.
+ * 
+ * @see ProviderRebalancer
+ */
+public interface TargetProvider {
+
+    /**
+     * Return target count of containers of a specific type.
+     * 
+     * @param containerType
+     *            meta resource name
+     * @return container count >= 1
+     * @throws Exception
+     */
+    public int getTargetContainerCount(String containerType) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java
new file mode 100644
index 0000000..4d6275e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/TargetProviderService.java
@@ -0,0 +1,9 @@
+package org.apache.helix.metamanager;
+
+/**
+ * TargetProvider as configurable service.
+ * 
+ */
+public interface TargetProviderService extends TargetProvider, Service {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java
new file mode 100644
index 0000000..39e20fe
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/ZookeeperSetter.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for setting String values in the embedded zookeeper service.
+ * (Program entry point)
+ * 
+ */
+public class ZookeeperSetter {
+
+    static Logger log = Logger.getLogger(ZookeeperSetter.class);
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        String address = args[0];
+        String path = args[1];
+        String value = args[2];
+
+        log.info(String.format("Setting %s:%s to '%s'", address, path, value));
+
+        ZkClient client = new ZkClient(address);
+        client.createPersistent(path, true);
+        client.writeData(path, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java
new file mode 100644
index 0000000..004de06
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/bootstrap/BootUtil.java
@@ -0,0 +1,58 @@
+package org.apache.helix.metamanager.bootstrap;
+
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.helix.metamanager.container.ContainerProcessProperties;
+import org.apache.log4j.Logger;
+
+public class BootUtil {
+
+    public static final String CLASS_PROPERTY = "class";
+    static final Logger log = Logger.getLogger(BootUtil.class);
+    
+    public static Properties getNamespace(String namespace, Properties source) {
+        Properties dest = new Properties();
+        String prefix = namespace + ".";
+        
+        for(Map.Entry<Object, Object> rawEntry : source.entrySet()) {
+            String key = (String)rawEntry.getKey();
+            String value = (String)rawEntry.getValue();
+            
+            if(key.startsWith(prefix)) {
+                String newKey = key.substring(prefix.length());
+                dest.put(newKey, value);
+            }
+        }
+        
+        return dest;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public static <T> T createInstance(Properties properties) throws Exception {
+        String className = properties.getProperty(CLASS_PROPERTY);
+        
+        Class<?> containerClass = Class.forName(className);
+        
+        try {
+            log.debug(String.format("checking for properties constructor in class '%s'", className));
+            return (T)containerClass.getConstructor(ContainerProcessProperties.class).newInstance(properties);
+        } catch (Exception e) {
+            log.debug("no properties constructor found");
+        }
+        
+        try {
+            log.debug(String.format("checking for default constructor in class '%s'", className));
+            return (T)containerClass.getConstructor().newInstance();
+        } catch (Exception e) {
+            log.debug("no default constructor found");
+        }
+        
+        throw new Exception(String.format("no suitable constructor for class '%s'", className));
+    }
+    
+    private BootUtil() {
+        // left blank
+    }
+    
+}