You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:14 UTC

[05/15] Adding Helix-task-framework and Yarn integration modules

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnContainerStatusProvider.java
new file mode 100644
index 0000000..f7e3076
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnContainerStatusProvider.java
@@ -0,0 +1,52 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import org.apache.helix.metamanager.ContainerStatusProvider;
+import org.apache.helix.metamanager.impl.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.impl.yarn.MetadataService.MetadataServiceException;
+
+public class YarnContainerStatusProvider implements ContainerStatusProvider {
+
+	final String metadataAddress;
+	
+	ZookeeperMetadataService metaService;
+	
+	public YarnContainerStatusProvider(String metadataAddress) {
+		this.metadataAddress = metadataAddress;
+		this.metaService = new ZookeeperMetadataService(metadataAddress);
+	}
+
+	@Override
+	public boolean exists(String id) {
+		return metaService.exists(id);
+	}
+
+	@Override
+	public boolean isActive(String id) {
+		try {
+			return metaService.read(id).state == ContainerState.ACTIVE;
+		} catch (MetadataServiceException e) {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean isFailed(String id) {
+		try {
+			return metaService.read(id).state == ContainerState.FAILED;
+		} catch (Exception e) {
+			return false;
+		}
+	}
+
+	public void startService() {
+		metaService = new ZookeeperMetadataService(metadataAddress);
+		metaService.startService();
+	}
+	
+	public void stopService() {
+		if(metaService != null) {
+			metaService.stopService();
+			metaService = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnDataProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnDataProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnDataProvider.java
new file mode 100644
index 0000000..8bd80b5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnDataProvider.java
@@ -0,0 +1,73 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.util.Collection;
+
+/**
+ * Abstraction for a (remote) repository of yarn container meta data. Meta data
+ * is read and updated by {@link YarnContainerProvider}
+ * {@link YarnMasterProcess}, {@link YarnContainerProcess}.<br/>
+ * <b>NOTE:</b> Each operation is assumed to be atomic.
+ * 
+ */
+interface YarnDataProvider {
+
+    /**
+     * Checks for existence of meta data about container insatnce
+     * 
+     * @param id
+     *            unique container id
+     * @return true, if meta data exists
+     */
+    public boolean exists(String id);
+
+    /**
+     * Create meta data entry. Check for non-existence of meta data for given
+     * container id and create node.
+     * 
+     * @param data
+     *            container meta data with unique id
+     * @throws Exception
+     *             if meta data entry already exist
+     */
+    public void create(YarnContainerData data) throws Exception;
+
+    /**
+     * Read meta data for given container id.
+     * 
+     * @param id
+     *            unique container id
+     * @return yarn container data
+     * @throws Exception
+     *             if meta data entry for given id does not exist
+     */
+    public YarnContainerData read(String id) throws Exception;
+
+    /**
+     * Read all meta data stored for this domain space of yarn providers and
+     * containers.
+     * 
+     * @return collection of meta data entries, empty if none
+     * @throws Exception
+     */
+    public Collection<YarnContainerData> readAll() throws Exception;
+
+    /**
+     * Write meta data entry.
+     * 
+     * @param data
+     *            yarn container meta data
+     * @throws Exception
+     *             if meta data entry for given id does not exist
+     */
+    public void update(YarnContainerData data) throws Exception;
+
+    /**
+     * Delete meta data entry. Frees up unique id to be reused. May throw an
+     * exception on non-existence or be idempotent.
+     * 
+     * @param id
+     *            unique container id
+     * @throws Exception
+     */
+    public void delete(String id) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProcess.java
new file mode 100644
index 0000000..d4447ee
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProcess.java
@@ -0,0 +1,144 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.provider.ProviderProcess;
+import org.apache.log4j.Logger;
+
+/**
+ * Host process for {@link YarnContainerProviderProcess}. Hasts application
+ * master in YARN and provider participant to Helix meta cluster. (Program entry
+ * point)
+ * 
+ */
+class YarnMasterProcess {
+
+    static final Logger log = Logger.getLogger(YarnMasterProcess.class);
+
+    public static void main(String[] args) throws Exception {
+        log.trace("BEGIN YarnMaster.main()");
+
+        final ApplicationAttemptId appAttemptId = getApplicationAttemptId();
+        log.info(String.format("Got application attempt id '%s'", appAttemptId.toString()));
+
+        log.debug("Reading master properties");
+        YarnMasterProperties properties = YarnUtils.createMasterProperties(YarnUtils.getPropertiesFromPath(YarnUtils.YARN_MASTER_PROPERTIES));
+
+        if (!properties.isValid())
+            throw new IllegalArgumentException(String.format("master properties not valid: %s", properties.toString()));
+
+        log.debug("Connecting to resource manager");
+        Configuration conf = new YarnConfiguration();
+        conf.set(YarnConfiguration.RM_ADDRESS, properties.getResourceManager());
+        conf.set(YarnConfiguration.RM_SCHEDULER_ADDRESS, properties.getScheduler());
+        conf.set(FileSystem.FS_DEFAULT_NAME_KEY, properties.getHdfs());
+
+        final AMRMProtocol resourceManager = getResourceManager(conf);
+
+        // register the AM with the RM
+        log.debug("Registering application master");
+        RegisterApplicationMasterRequest appMasterRequest = Records.newRecord(RegisterApplicationMasterRequest.class);
+        appMasterRequest.setApplicationAttemptId(appAttemptId);
+        appMasterRequest.setHost("");
+        appMasterRequest.setRpcPort(0);
+        appMasterRequest.setTrackingUrl("");
+
+        resourceManager.registerApplicationMaster(appMasterRequest);
+
+        log.debug("Starting yarndata service");
+        final ZookeeperYarnDataProvider yarnDataService = new ZookeeperYarnDataProvider(properties.getYarnData());
+        yarnDataService.start();
+
+        log.debug("Starting yarn master service");
+        final YarnMasterService service = new YarnMasterService();
+        service.configure(properties);
+        service.setAttemptId(appAttemptId);
+        service.setYarnDataProvider(yarnDataService);
+        service.setProtocol(resourceManager);
+        service.setYarnConfiguration(conf);
+        service.start();
+
+        log.debug("Starting provider");
+        final YarnContainerProvider provider = new YarnContainerProvider();
+        provider.configure(properties);
+        provider.start();
+
+        log.debug("Starting provider process");
+        final ProviderProcess process = new ProviderProcess();
+        process.configure(properties);
+        process.setConteinerProvider(provider);
+        process.start();
+
+        log.debug("Installing shutdown hooks");
+        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+            @Override
+            public void run() {
+                log.debug("Stopping provider process");
+                process.stop();
+
+                log.debug("Stopping provider");
+                try { provider.stop(); } catch (Exception ignore) {}
+
+                log.debug("Stopping yarn master service");
+                service.stop();
+
+                log.debug("Stopping yarndata service");
+                yarnDataService.stop();
+
+                // finish application
+                log.debug("Sending finish request");
+                FinishApplicationMasterRequest finishReq = Records.newRecord(FinishApplicationMasterRequest.class);
+
+                finishReq.setAppAttemptId(getApplicationAttemptId());
+                finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+	    	    
+	    	    try { resourceManager.finishApplicationMaster(finishReq); } catch(Exception ignore) {}
+	    	}
+	    }));
+	    
+		log.trace("END YarnMaster.main()");
+	}
+	
+    static AMRMProtocol getResourceManager(Configuration conf) {
+        // Connect to the Scheduler of the ResourceManager.
+        YarnConfiguration yarnConf = new YarnConfiguration(conf);
+        YarnRPC rpc = YarnRPC.create(yarnConf);
+        InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
+                YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+        log.info("Connecting to ResourceManager at " + rmAddress);
+        AMRMProtocol resourceManager = (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
+        return resourceManager;
+    }
+
+    static ApplicationAttemptId getApplicationAttemptId() {
+        ContainerId containerId = ConverterUtils.toContainerId(getEnv(ApplicationConstants.AM_CONTAINER_ID_ENV));
+        ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+        return appAttemptID;
+    }
+
+    static String getEnv(String key) {
+        Map<String, String> envs = System.getenv();
+        String clusterName = envs.get(key);
+        if (clusterName == null) {
+            // container id should always be set in the env by the framework
+            throw new IllegalArgumentException(String.format("%s not set in the environment", key));
+        }
+        return clusterName;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProperties.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProperties.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProperties.java
new file mode 100644
index 0000000..abeb461
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterProperties.java
@@ -0,0 +1,13 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+/**
+ * Base configuration for {@link YarnMasterProcess}. 
+ *
+ */
+public class YarnMasterProperties extends YarnContainerProviderProperties {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -2209509980239674160L;
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterService.java
new file mode 100644
index 0000000..1e7aec3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnMasterService.java
@@ -0,0 +1,414 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.Service;
+import org.apache.helix.metamanager.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * Implements YARN application master. Continuously monitors container health in
+ * YARN and yarn meta data updates. Spawns and destroys containers.
+ * 
+ */
+class YarnMasterService implements Service {
+
+    static final Logger                     log                    = Logger.getLogger(YarnMasterService.class);
+
+    static final String                     REQUIRED_TYPE          = "container";
+
+    static final long                       ZOOKEEPER_TIMEOUT      = 5000;
+    static final long                       MASTERSERVICE_INTERVAL = 1000;
+
+    static final String                     CONTAINERS             = "CONTAINERS";
+
+    static final String                     YARN_CONTAINER_COMMAND = "/bin/sh %s 1>%s/stdout 2>%s/stderr";
+
+    YarnMasterProperties                    properties;
+    AMRMProtocol                            protocol;
+    ApplicationAttemptId                    attemptId;
+    Configuration                           yarnConfig;
+    YarnDataProvider                        yarnDataService;
+
+    final Map<ContainerId, Container>       unassignedContainers   = new HashMap<ContainerId, Container>();
+    final Map<ContainerId, Container>       activeContainers       = new HashMap<ContainerId, Container>();
+    final Map<ContainerId, ContainerStatus> completedContainers    = new HashMap<ContainerId, ContainerStatus>();
+    final Map<ContainerId, String>          yarn2meta              = new HashMap<ContainerId, String>();
+
+    ScheduledExecutorService                executor;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        YarnMasterProperties yarnProperties = YarnUtils.createMasterProperties(properties);
+        Preconditions.checkArgument(yarnProperties.isValid());
+        this.properties = yarnProperties;
+    }
+
+    public void setProtocol(AMRMProtocol protocol) {
+        this.protocol = protocol;
+    }
+
+    public void setAttemptId(ApplicationAttemptId attemptId) {
+        this.attemptId = attemptId;
+    }
+
+    public void setYarnConfiguration(Configuration yarnConfig) {
+        this.yarnConfig = yarnConfig;
+    }
+
+    public void setYarnDataProvider(YarnDataProvider yarnDataService) {
+        this.yarnDataService = yarnDataService;
+    }
+
+    @Override
+    public void start() {
+        Preconditions.checkNotNull(properties);
+        Preconditions.checkNotNull(protocol);
+        Preconditions.checkNotNull(attemptId);
+        Preconditions.checkNotNull(yarnConfig);
+        Preconditions.checkNotNull(yarnDataService);
+
+        log.debug("starting yarn master service");
+
+        executor = Executors.newSingleThreadScheduledExecutor();
+        executor.scheduleAtFixedRate(new YarnService(), 0, MASTERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void stop() {
+        log.debug("stopping yarn master service");
+
+        if (executor != null) {
+            executor.shutdown();
+            while (!executor.isTerminated()) {
+                try {
+                    Thread.sleep(100);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+            executor = null;
+        }
+
+        destroyLocalMasterNamespace();
+    }
+
+    Collection<YarnContainerData> readOwnedYarnData() throws Exception {
+        log.debug("reading container data");
+
+        Collection<YarnContainerData> containers = new ArrayList<YarnContainerData>();
+        for (YarnContainerData meta : yarnDataService.readAll()) {
+            if (meta.owner.equals(properties.getName())) {
+                containers.add(meta);
+                log.debug(String.format("found container node '%s' (state=%s, yarnId=%s, owner=%s)", meta.id, meta.state, meta.yarnId, meta.owner));
+            }
+        }
+        return containers;
+    }
+
+    class YarnService implements Runnable {
+        int responseId = 0;
+
+        @Override
+        public void run() {
+            try {
+                log.debug("running yarn service update cycle");
+
+                Collection<YarnContainerData> yarndata = readOwnedYarnData();
+
+                // active meta containers
+                int numMetaActive = countActiveMeta(yarndata);
+
+                // newly acquired meta containers
+                int numMetaAcquire = countAcquireMeta(yarndata);
+
+                // destroyed meta containers
+                List<ContainerId> destroyedReleasedIds = createDestroyedReleaseList(yarndata);
+                int numMetaCompleted = destroyedReleasedIds.size();
+
+                int numMeta = numMetaAcquire + numMetaActive + numMetaCompleted;
+
+                // yarn containers
+                int numYarnUnassigned = unassignedContainers.size();
+                int numYarnActive = activeContainers.size();
+                int numYarnCompleted = completedContainers.size();
+                int numYarn = numYarnUnassigned + numYarnActive + numYarnCompleted;
+
+                int numYarnRequired = numMetaAcquire - numYarnUnassigned;
+
+                // additionally required containers
+                int numRequestAdditional = Math.max(0, numYarnRequired);
+
+                // overstock containers
+                List<ContainerId> unneededReleasedIds = createOverstockReleaseList(numYarnRequired);
+
+                int numReleased = destroyedReleasedIds.size() + unneededReleasedIds.size();
+
+                log.debug(String.format("meta containers (total=%d, acquire=%d, active=%d, completed=%d)", numMeta, numMetaAcquire, numMetaActive, numMetaCompleted));
+                log.debug(String.format("yarn containers (total=%d, unassigned=%d, active=%d, completed=%d)", numYarn, numYarnUnassigned, numYarnActive, numYarnCompleted));
+                log.debug(String.format("requesting %d new containers, releasing %d", numRequestAdditional, numReleased));
+
+                Priority priority = Records.newRecord(Priority.class);
+                priority.setPriority(0);
+
+                Resource resource = Records.newRecord(Resource.class);
+                resource.setMemory(256); // TODO make dynamic
+
+                ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+                resourceRequest.setHostName("*");
+                resourceRequest.setNumContainers(numRequestAdditional);
+                resourceRequest.setPriority(priority);
+                resourceRequest.setCapability(resource);
+
+                AllocateRequest request = Records.newRecord(AllocateRequest.class);
+                request.setResponseId(responseId);
+                request.setApplicationAttemptId(attemptId);
+                request.addAsk(resourceRequest);
+                request.addAllReleases(destroyedReleasedIds);
+                request.addAllReleases(unneededReleasedIds);
+
+                responseId++;
+
+                AllocateResponse allocateResponse = null;
+                try {
+                    allocateResponse = protocol.allocate(request);
+                } catch (YarnRemoteException e) {
+                    // ignore
+                    log.error("Error allocating containers", e);
+                    return;
+                }
+
+                AMResponse response = allocateResponse.getAMResponse();
+
+                // remove unassigned container about to be freed
+                for (ContainerId id : unneededReleasedIds) {
+                    log.info(String.format("Unassigned container '%s' about to be freed, removing", id));
+                    unassignedContainers.remove(id);
+                }
+
+                // newly added containers
+                for (Container container : response.getAllocatedContainers()) {
+                    unassignedContainers.put(container.getId(), container);
+                }
+
+                log.info(String.format("%d new containers available, %d required", unassignedContainers.size(), numMetaAcquire));
+
+                Iterator<Container> itYarn = unassignedContainers.values().iterator();
+                Iterator<YarnContainerData> itMeta = yarndata.iterator();
+                while (itYarn.hasNext() && itMeta.hasNext()) {
+                    YarnContainerData meta = itMeta.next();
+
+                    if (meta.yarnId >= 0)
+                        continue;
+
+                    Container containerYarn = itYarn.next();
+
+                    log.debug(String.format("assigning yarn container '%s' to container node '%s'", containerYarn.getId(), meta.id));
+
+                    String command = String.format(YARN_CONTAINER_COMMAND, YarnUtils.YARN_CONTAINER_PATH, ApplicationConstants.LOG_DIR_EXPANSION_VAR,
+                            ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+                    log.debug(String.format("Running container command \"%s\"", command));
+
+                    // configuration
+                    YarnContainerProcessProperties containerProp = meta.getProperties();
+                    containerProp.setProperty(YarnContainerProcessProperties.ADDRESS, properties.getAddress());
+                    containerProp.setProperty(YarnContainerProcessProperties.CLUSTER, properties.getCluster());
+                    containerProp.setProperty(YarnContainerProcessProperties.YARNDATA, properties.getYarnData());
+                    containerProp.setProperty(YarnContainerProcessProperties.NAME, meta.id);
+
+                    File propertiesFile = YarnUtils.writePropertiesToTemp(containerProp);
+
+                    // HDFS
+                    final String namespace = attemptId.getApplicationId().toString() + "/" + meta.id;
+                    final Path containerArchive = YarnUtils.copyToHdfs(YarnUtils.YARN_CONTAINER_STAGING, YarnUtils.YARN_CONTAINER_STAGING, namespace, yarnConfig);
+                    final Path containerProperties = YarnUtils.copyToHdfs(propertiesFile.getCanonicalPath(), YarnUtils.YARN_CONTAINER_PROPERTIES, namespace, yarnConfig);
+
+                    // local resources
+                    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+                    localResources.put(YarnUtils.YARN_CONTAINER_DESTINATION,
+                            YarnUtils.createHdfsResource(containerArchive, LocalResourceType.ARCHIVE, yarnConfig));
+                    localResources.put(YarnUtils.YARN_CONTAINER_PROPERTIES,
+                            YarnUtils.createHdfsResource(containerProperties, LocalResourceType.FILE, yarnConfig));
+
+                    ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+                    context.setContainerId(containerYarn.getId());
+                    context.setResource(containerYarn.getResource());
+                    context.setEnvironment(Maps.<String, String> newHashMap());
+                    context.setCommands(Collections.singletonList(command));
+                    context.setLocalResources(localResources);
+                    context.setUser(properties.getUser());
+
+                    log.debug(String.format("container '%s' executing command '%s'", meta.id, command));
+
+                    StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+                    startReq.setContainerLaunchContext(context);
+
+                    try {
+                        getContainerManager(containerYarn).startContainer(startReq);
+
+                    } catch (YarnRemoteException e) {
+                        log.error(String.format("Error starting container '%s'", meta.id), e);
+                        return;
+                    }
+
+                    log.debug(String.format("container '%s' started, updating container node", meta.id));
+
+                    meta.setProperties(containerProp);
+                    meta.setState(ContainerState.CONNECTING);
+                    meta.setYarnId(containerYarn.getId().getId());
+                    yarnDataService.update(meta);
+
+                    yarn2meta.put(containerYarn.getId(), meta.id);
+
+                    log.debug(String.format("removing '%s' from unassigned yarn containers and adding to active list", containerYarn.getId()));
+
+                    itYarn.remove();
+                    activeContainers.put(containerYarn.getId(), containerYarn);
+
+                    // cleanup
+                    propertiesFile.deleteOnExit();
+
+                }
+
+                for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+                    ContainerId id = status.getContainerId();
+
+                    log.info(String.format("Container '%s' completed", id));
+
+                    if (unassignedContainers.containsKey(id)) {
+                        log.info(String.format("Unassigned container '%s' terminated, removing", id));
+                        unassignedContainers.remove(id);
+                    }
+
+                    if (activeContainers.containsKey(id)) {
+                        log.info(String.format("Active container '%s' terminated, removing", id));
+                        activeContainers.remove(id);
+
+                        String metaId = yarn2meta.get(id);
+                        YarnContainerData meta = yarnDataService.read(metaId);
+
+                        log.debug(String.format("container '%s' finalized, updating container node", meta.id));
+
+                        yarnDataService.update(meta.setState(ContainerState.FINALIZE));
+                    }
+
+                    completedContainers.put(id, status);
+                }
+
+                log.debug("yarn service update cycle complete");
+
+            } catch (Exception e) {
+                log.error("Error while executing yarn update cycle", e);
+            }
+        }
+
+        private List<ContainerId> createOverstockReleaseList(int numYarnRequired) {
+            List<ContainerId> unneededReleasedIds = new ArrayList<ContainerId>();
+            Iterator<Container> itUnassigned = unassignedContainers.values().iterator();
+            if (numYarnRequired < 0) {
+                for (int i = 0; i < -numYarnRequired && itUnassigned.hasNext(); i++) {
+                    Container container = itUnassigned.next();
+                    unneededReleasedIds.add(container.getId());
+                    log.debug(String.format("Container '%s' no longer required, removing", container.getId()));
+                    itUnassigned.remove();
+                }
+            }
+            return unneededReleasedIds;
+        }
+
+        private List<ContainerId> createDestroyedReleaseList(Collection<YarnContainerData> yarndata) {
+            List<ContainerId> releasedIds = new ArrayList<ContainerId>();
+            for (YarnContainerData meta : yarndata) {
+                if (meta.state == ContainerState.HALTED) {
+                    ContainerId containerId = Records.newRecord(ContainerId.class);
+                    containerId.setApplicationAttemptId(attemptId);
+                    containerId.setId(meta.yarnId);
+                    releasedIds.add(containerId);
+                    log.debug(String.format("releasing container '%s'", containerId));
+                }
+            }
+            return releasedIds;
+        }
+
+        private int countAcquireMeta(Collection<YarnContainerData> yarndata) {
+            int numMetaAcquire = 0;
+            for (YarnContainerData meta : yarndata) {
+                if (meta.state == ContainerState.ACQUIRE) {
+                    numMetaAcquire++;
+                }
+            }
+            return numMetaAcquire;
+        }
+
+        private int countActiveMeta(Collection<YarnContainerData> yarndata) {
+            int numMetaActive = 0;
+            for (YarnContainerData meta : yarndata) {
+                if (meta.state != ContainerState.ACQUIRE && meta.state != ContainerState.HALTED && meta.state != ContainerState.FINALIZE) {
+                    numMetaActive++;
+                }
+            }
+            return numMetaActive;
+        }
+    }
+
+    private ContainerManager getContainerManager(Container container) {
+        YarnConfiguration yarnConf = new YarnConfiguration(yarnConfig);
+        YarnRPC rpc = YarnRPC.create(yarnConf);
+        NodeId nodeId = container.getNodeId();
+        String containerIpPort = String.format("%s:%d", nodeId.getHost(), nodeId.getPort());
+        log.info("Connecting to ContainerManager at: " + containerIpPort);
+        InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
+        ContainerManager cm = (ContainerManager) rpc.getProxy(ContainerManager.class, addr, yarnConfig);
+        return cm;
+    }
+
+    public static void destroyLocalMasterNamespace() {
+        log.info("cleaning up master directory");
+        FileUtils.deleteQuietly(new File(YarnUtils.YARN_MASTER_DESTINATION));
+        FileUtils.deleteQuietly(new File(YarnUtils.YARN_MASTER_PROPERTIES));
+        FileUtils.deleteQuietly(new File(YarnUtils.YARN_CONTAINER_STAGING));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnStatusProvider.java
new file mode 100644
index 0000000..b4a13b9
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnStatusProvider.java
@@ -0,0 +1,67 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.util.Properties;
+
+import org.apache.helix.metamanager.StatusProviderService;
+import org.apache.helix.metamanager.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+/**
+ * StatusProvider for YARN-based containers spawned via
+ * {@link YarnContainerProvider}. Reads {@link YarnDataProvider} meta data.
+ * Runnable and configurable service.
+ * 
+ */
+public class YarnStatusProvider implements StatusProviderService {
+
+    static final Logger       log = Logger.getLogger(YarnStatusProvider.class);
+
+    String                    yarndata;
+
+    ZookeeperYarnDataProvider yarnDataService;
+
+    public YarnStatusProvider() {
+        // left blank
+    }
+
+    public YarnStatusProvider(String yarndata) {
+        this.yarndata = yarndata;
+        this.yarnDataService = new ZookeeperYarnDataProvider(yarndata);
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        this.yarndata = properties.getProperty("yarndata");
+        this.yarnDataService = new ZookeeperYarnDataProvider(yarndata);
+    }
+
+    @Override
+    public void start() throws Exception {
+        yarnDataService = new ZookeeperYarnDataProvider(yarndata);
+        yarnDataService.start();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (yarnDataService != null) {
+            yarnDataService.stop();
+            yarnDataService = null;
+        }
+    }
+
+    @Override
+    public boolean exists(String id) {
+        return yarnDataService.exists(id);
+    }
+
+    @Override
+    public boolean isHealthy(String id) {
+        try {
+            return yarnDataService.read(id).state == ContainerState.ACTIVE;
+        } catch (Exception e) {
+            log.warn(String.format("Could not get activity data of %s", id));
+            return false;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnUtils.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnUtils.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnUtils.java
new file mode 100644
index 0000000..fe093c8
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/YarnUtils.java
@@ -0,0 +1,174 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.impl.yarn.YarnContainerData.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.io.Files;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+/**
+ * Utility for writing property files, transferring data via HDFS and
+ * serializing {@link YarnContainerData} for zookeeper.
+ * 
+ */
+class YarnUtils {
+
+    static final Logger log                         = Logger.getLogger(YarnUtils.class);
+
+    static final String YARN_MASTER_ARCHIVE_PATH    = "target/metamanager-assembly.tar.gz";
+    static final String YARN_MASTER_PATH            = "master/metamanager/bin/yarn-master-process.sh";
+    static final String YARN_MASTER_STAGING         = "master.tar.gz";
+    static final String YARN_MASTER_DESTINATION     = "master";
+    static final String YARN_MASTER_PROPERTIES      = "master.properties";
+    static final String YARN_CONTAINER_ARCHIVE_PATH = "target/metamanager-assembly.tar.gz";
+    static final String YARN_CONTAINER_STAGING      = "container.tar.gz";
+    static final String YARN_CONTAINER_PATH         = "container/metamanager/bin/yarn-container-process.sh";
+    static final String YARN_CONTAINER_DESTINATION  = "container";
+    static final String YARN_CONTAINER_PROPERTIES   = "container.properties";
+
+    static Gson         gson;
+    static {
+        GsonBuilder builder = new GsonBuilder();
+        builder.registerTypeAdapter(ContainerState.class, new ContainerStateAdapter());
+        builder.setPrettyPrinting();
+        gson = builder.create();
+    }
+
+    public static String toJson(YarnContainerData meta) {
+        return gson.toJson(meta);
+    }
+
+    public static YarnContainerData fromJson(String json) {
+        return gson.fromJson(json, YarnContainerData.class);
+    }
+
+    public static Properties getPropertiesFromPath(String path) throws IOException {
+        Properties properties = new Properties();
+        properties.load(new InputStreamReader(new FileInputStream(path)));
+        return properties;
+    }
+
+    public static File writePropertiesToTemp(Properties properties) throws IOException {
+        File tmpFile = File.createTempFile("provider", ".properties");
+        Writer writer = Files.newWriter(tmpFile, Charset.defaultCharset());
+        properties.store(writer, null);
+        writer.flush();
+        writer.close();
+        return tmpFile;
+    }
+
+    public static Path copyToHdfs(String source, String dest, String namespace, Configuration conf) throws IOException {
+        Path sourcePath = makeQualified(source);
+        Path destPath = makeQualified(conf.get(FileSystem.FS_DEFAULT_NAME_KEY) + "/" + namespace + "/" + dest);
+        log.debug(String.format("Copying '%s' to '%s'", sourcePath, destPath));
+
+        FileSystem fs = FileSystem.get(conf);
+        fs.copyFromLocalFile(false, true, sourcePath, destPath);
+        fs.close();
+        return destPath;
+    }
+
+    public static void destroyHdfsNamespace(String namespace, Configuration conf) throws IOException {
+        Path path = makeQualified(conf.get(FileSystem.FS_DEFAULT_NAME_KEY) + "/" + namespace);
+        log.debug(String.format("Deleting '%s'", path));
+
+        FileSystem fs = FileSystem.get(conf);
+        fs.delete(path, true);
+        fs.close();
+    }
+
+    public static LocalResource createHdfsResource(Path path, LocalResourceType type, Configuration conf) throws IOException {
+        FileSystem fs = FileSystem.get(conf);
+
+        URL url = ConverterUtils.getYarnUrlFromPath(path);
+
+        FileStatus status = fs.getFileStatus(path);
+
+        LocalResource resource = Records.newRecord(LocalResource.class);
+        resource.setResource(url);
+        resource.setSize(status.getLen());
+        resource.setTimestamp(status.getModificationTime());
+        resource.setType(type);
+        resource.setVisibility(LocalResourceVisibility.APPLICATION);
+
+        fs.close();
+
+        return resource;
+    }
+
+    static Path makeQualified(String path) throws UnsupportedFileSystemException {
+        return FileContext.getFileContext().makeQualified(new Path(path));
+    }
+
+    static class ContainerStateAdapter extends TypeAdapter<ContainerState> {
+        @Override
+        public ContainerState read(JsonReader reader) throws IOException {
+            if (reader.peek() == JsonToken.NULL) {
+                reader.nextNull();
+                return null;
+            }
+            return ContainerState.valueOf(reader.nextString());
+        }
+
+        @Override
+        public void write(JsonWriter writer, ContainerState value) throws IOException {
+            if (value == null) {
+                writer.nullValue();
+                return;
+            }
+            writer.value(value.name());
+        }
+    }
+
+    static YarnContainerProcessProperties createContainerProcessProperties(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        YarnContainerProcessProperties yarnProp = new YarnContainerProcessProperties();
+        yarnProp.putAll(properties);
+        return yarnProp;
+    }
+
+    static YarnContainerProviderProperties createContainerProviderProperties(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        YarnContainerProviderProperties yarnProp = new YarnContainerProviderProperties();
+        yarnProp.putAll(properties);
+        return yarnProp;
+    }
+
+    static YarnMasterProperties createMasterProperties(Properties properties) {
+        Preconditions.checkNotNull(properties);
+        YarnMasterProperties yarnProp = new YarnMasterProperties();
+        yarnProp.putAll(properties);
+        return yarnProp;
+    }
+
+    private YarnUtils() {
+        // left blank
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataProvider.java
new file mode 100644
index 0000000..79efd8c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataProvider.java
@@ -0,0 +1,116 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.metamanager.Service;
+import org.apache.log4j.Logger;
+
+public class ZookeeperMetadataProvider implements MetadataProvider, Service {
+
+    static final Logger log                 = Logger.getLogger(ZookeeperMetadataProvider.class);
+
+    static final String CONTAINER_NAMESPACE = "containers";
+
+    static final String BASE_PATH           = "/" + CONTAINER_NAMESPACE;
+
+    static final int    META_TIMEOUT        = 5000;
+    static final long   POLL_INTERVAL       = 100;
+
+    String              metadata;
+
+    ZkClient            client;
+
+    public ZookeeperMetadataProvider() {
+        // left blank
+    }
+
+    public ZookeeperMetadataProvider(String metadataAddress) {
+        this.metadata = metadataAddress;
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        this.metadata = properties.getProperty("metadata");
+    }
+
+    @Override
+    public void start() {
+        log.debug(String.format("starting metadata service for '%s'", metadata));
+
+        client = new ZkClient(metadata, META_TIMEOUT, META_TIMEOUT);
+
+        client.createPersistent(BASE_PATH, true);
+    }
+
+    @Override
+    public void stop() {
+        log.debug(String.format("stopping metadata service for '%s'", metadata));
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
+
+    @Override
+    public boolean exists(String id) {
+        return client.exists(makePath(id));
+    }
+
+    @Override
+    public void create(ContainerMetadata meta) throws MetadataException {
+        try {
+            client.createEphemeral(makePath(meta.id), YarnUtils.toJson(meta));
+        } catch (ZkException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public ContainerMetadata read(String id) throws MetadataException {
+        try {
+            return YarnUtils.fromJson(client.<String> readData(makePath(id)));
+        } catch (ZkException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public Collection<ContainerMetadata> readAll() throws MetadataException {
+        try {
+            Collection<ContainerMetadata> metadata = new ArrayList<ContainerMetadata>();
+            for (String id : client.getChildren(BASE_PATH)) {
+                metadata.add(YarnUtils.fromJson(client.<String> readData(makePath(id))));
+            }
+            return metadata;
+        } catch (ZkException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public void update(ContainerMetadata meta) throws MetadataException {
+        try {
+            client.writeData(makePath(meta.id), YarnUtils.toJson(meta));
+        } catch (ZkException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    @Override
+    public void delete(String id) throws MetadataException {
+        try {
+            client.delete(makePath(id));
+        } catch (ZkException e) {
+            throw new MetadataException(e);
+        }
+    }
+
+    String makePath(String containerId) {
+        return BASE_PATH + "/" + containerId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataService.java
new file mode 100644
index 0000000..b0e150a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperMetadataService.java
@@ -0,0 +1,102 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.log4j.Logger;
+
+public class ZookeeperMetadataService implements MetadataService {
+
+    static final Logger log                 = Logger.getLogger(ZookeeperMetadataService.class);
+
+    static final String CONTAINER_NAMESPACE = "containers";
+
+    static final String BASE_PATH           = "/" + CONTAINER_NAMESPACE;
+
+    static final long   POLL_INTERVAL       = 100;
+
+    final String        metadataAddress;
+
+    ZkClient            client;
+
+    public ZookeeperMetadataService(String metadataAddress) {
+        this.metadataAddress = metadataAddress;
+    }
+
+    public void startService() {
+        log.debug(String.format("starting metadata service for '%s'", metadataAddress));
+
+        client = new ZkClient(metadataAddress);
+
+        client.createPersistent(BASE_PATH, true);
+    }
+
+    public void stopService() {
+        log.debug(String.format("stopping metadata service for '%s'", metadataAddress));
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
+
+    @Override
+    public boolean exists(String id) {
+        return client.exists(makePath(id));
+    }
+
+    @Override
+    public void create(ContainerMetadata meta) throws MetadataServiceException {
+        try {
+            client.createPersistent(makePath(meta.id), YarnUtils.toJson(meta));
+        } catch (ZkException e) {
+            throw new MetadataServiceException(e);
+        }
+    }
+
+    @Override
+    public ContainerMetadata read(String id) throws MetadataServiceException {
+        try {
+            return YarnUtils.fromJson(client.<String> readData(makePath(id)));
+        } catch (ZkException e) {
+            throw new MetadataServiceException(e);
+        }
+    }
+
+    @Override
+    public Collection<ContainerMetadata> readAll() throws MetadataServiceException {
+        try {
+            Collection<ContainerMetadata> metadata = new ArrayList<ContainerMetadata>();
+            for (String id : client.getChildren(BASE_PATH)) {
+                metadata.add(YarnUtils.fromJson(client.<String> readData(makePath(id))));
+            }
+            return metadata;
+        } catch (ZkException e) {
+            throw new MetadataServiceException(e);
+        }
+    }
+
+    @Override
+    public void update(ContainerMetadata meta) throws MetadataServiceException {
+        try {
+            client.writeData(makePath(meta.id), YarnUtils.toJson(meta));
+        } catch (ZkException e) {
+            throw new MetadataServiceException(e);
+        }
+    }
+
+    @Override
+    public void delete(String id) throws MetadataServiceException {
+        try {
+            client.delete(makePath(id));
+        } catch (ZkException e) {
+            throw new MetadataServiceException(e);
+        }
+    }
+
+    String makePath(String containerId) {
+        return BASE_PATH + "/" + containerId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperYarnDataProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperYarnDataProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperYarnDataProvider.java
new file mode 100644
index 0000000..32af837
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/impl/yarn/ZookeeperYarnDataProvider.java
@@ -0,0 +1,100 @@
+package org.apache.helix.metamanager.impl.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Properties;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.helix.metamanager.Service;
+import org.apache.log4j.Logger;
+
+/**
+ * Configurable and runnable service for {@link YarnDataProvider} based on
+ * zookeeper.
+ * 
+ */
+public class ZookeeperYarnDataProvider implements YarnDataProvider, Service {
+
+    static final Logger log                 = Logger.getLogger(ZookeeperYarnDataProvider.class);
+
+    static final String CONTAINER_NAMESPACE = "containers";
+
+    static final String BASE_PATH           = "/" + CONTAINER_NAMESPACE;
+
+    static final int    META_TIMEOUT        = 5000;
+    static final long   POLL_INTERVAL       = 100;
+
+    String              yarndata;
+
+    ZkClient            client;
+
+    public ZookeeperYarnDataProvider() {
+        // left blank
+    }
+
+    public ZookeeperYarnDataProvider(String yarndataAddress) {
+        this.yarndata = yarndataAddress;
+    }
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        this.yarndata = properties.getProperty("yarndata");
+    }
+
+    @Override
+    public void start() {
+        log.debug(String.format("starting yarndata service for '%s'", yarndata));
+
+        client = new ZkClient(yarndata, META_TIMEOUT, META_TIMEOUT);
+
+        client.createPersistent(BASE_PATH, true);
+    }
+
+    @Override
+    public void stop() {
+        log.debug(String.format("stopping yarndata service for '%s'", yarndata));
+        if (client != null) {
+            client.close();
+            client = null;
+        }
+    }
+
+    @Override
+    public boolean exists(String id) {
+        return client.exists(makePath(id));
+    }
+
+    @Override
+    public void create(YarnContainerData meta) throws Exception {
+        client.createEphemeral(makePath(meta.id), YarnUtils.toJson(meta));
+    }
+
+    @Override
+    public YarnContainerData read(String id) throws Exception {
+        return YarnUtils.fromJson(client.<String> readData(makePath(id)));
+    }
+
+    @Override
+    public Collection<YarnContainerData> readAll() throws Exception {
+        Collection<YarnContainerData> yarndata = new ArrayList<YarnContainerData>();
+        for (String id : client.getChildren(BASE_PATH)) {
+            yarndata.add(YarnUtils.fromJson(client.<String> readData(makePath(id))));
+        }
+        return yarndata;
+    }
+
+    @Override
+    public void update(YarnContainerData meta) throws Exception {
+        client.writeData(makePath(meta.id), YarnUtils.toJson(meta));
+    }
+
+    @Override
+    public void delete(String id) throws Exception {
+        client.delete(makePath(id));
+    }
+
+    String makePath(String containerId) {
+        return BASE_PATH + "/" + containerId;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ContainerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ContainerProcess.java
new file mode 100644
index 0000000..11fb75d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ContainerProcess.java
@@ -0,0 +1,85 @@
+package org.apache.helix.metamanager.managed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+public class ContainerProcess
+{
+	static final Logger log = Logger.getLogger(ContainerProcess.class);
+	
+  private String clusterName;
+  private String zkAddress;
+  private String instanceName;
+  private HelixManager participantManager;
+
+ public ContainerProcess(String clusterName, String zkAddress, String instanceName)
+  {
+    this.clusterName = clusterName;
+    this.zkAddress = zkAddress;
+    this.instanceName = instanceName;
+
+  }
+
+  public void start() throws Exception
+  {
+    log.info("STARTING "+ instanceName);
+    participantManager = HelixManagerFactory.getZKHelixManager(clusterName,
+        instanceName, InstanceType.PARTICIPANT, zkAddress);
+    participantManager.getStateMachineEngine().registerStateModelFactory(
+        "MasterSlave", new ManagedFactory());
+    participantManager.connect();
+    log.info("STARTED "+ instanceName);
+
+  }
+
+  public void stop()
+  {
+    if (participantManager != null)
+    {
+      participantManager.disconnect();
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    final String zkAddress = args[0];
+    final String clusterName = args[1];
+    final String instanceName = args[2];
+
+    // Give a unique id to each process, most commonly used format hostname_port
+    final ContainerProcess managerProcess = new ContainerProcess(clusterName, zkAddress,
+        instanceName);
+    Runtime.getRuntime().addShutdownHook(new Thread()
+    {
+      @Override
+      public void run()
+      {
+    	  log.info("Shutting down " + instanceName);
+        managerProcess.stop();
+      }
+    });
+    managerProcess.start();
+    Thread.currentThread().join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/HelixClusterAdmin.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/HelixClusterAdmin.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/HelixClusterAdmin.java
new file mode 100644
index 0000000..f33c09c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/HelixClusterAdmin.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metamanager.managed;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.metamanager.ClusterAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+public class HelixClusterAdmin implements ClusterAdmin {
+	
+	static final Logger log = Logger.getLogger(HelixClusterAdmin.class);
+
+	final String clusterName;
+	final String resourceName;
+	final int replica;
+	final HelixAdmin admin;
+	
+	public HelixClusterAdmin(String clusterName, String resourceName,
+			int replica, HelixAdmin admin) {
+		this.clusterName = clusterName;
+		this.resourceName = resourceName;
+		this.replica = replica;
+		this.admin = admin;
+	}
+
+	@Override
+	public synchronized void addInstance(String connection) {
+		log.debug(String.format("injecting instance %s in cluster %s", connection, clusterName));
+		admin.addInstance(clusterName, new InstanceConfig(connection));
+	}
+
+	@Override
+	public synchronized void removeInstance(String connection) {
+		log.debug(String.format("removing instance %s from cluster %s", connection, clusterName));
+		admin.dropInstance(clusterName, new InstanceConfig(connection));
+	}
+
+	@Override
+	public void rebalance() {
+		admin.rebalance(clusterName, resourceName, replica);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalClusterManager.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalClusterManager.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalClusterManager.java
new file mode 100644
index 0000000..2bb64de
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalClusterManager.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metamanager.managed;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.metamanager.ClusterAdmin;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+public class LocalClusterManager implements ClusterAdmin {
+	
+	static final Logger log = Logger.getLogger(LocalClusterManager.class);
+
+	final String clusterName;
+	final String resourceName;
+	final int replica;
+	final HelixAdmin admin;
+	
+	public LocalClusterManager(String clusterName, String resourceName,
+			int replica, HelixAdmin admin) {
+		this.clusterName = clusterName;
+		this.resourceName = resourceName;
+		this.replica = replica;
+		this.admin = admin;
+	}
+
+	@Override
+	public synchronized void addInstance(String connection) {
+		log.debug(String.format("injecting instance %s in cluster %s", connection, clusterName));
+		admin.addInstance(clusterName, new InstanceConfig(connection));
+	}
+
+	@Override
+	public synchronized void removeInstance(String connection) {
+		log.debug(String.format("removing instance %s from cluster %s", connection, clusterName));
+		admin.dropInstance(clusterName, new InstanceConfig(connection));
+	}
+
+	@Override
+	public void rebalance() {
+		admin.rebalance(clusterName, resourceName, replica);
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalContainerProvider.java
new file mode 100644
index 0000000..6c8eec0
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalContainerProvider.java
@@ -0,0 +1,87 @@
+package org.apache.helix.metamanager.managed;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.log4j.Logger;
+
+public class LocalContainerProvider implements ClusterContainerProvider {
+
+	static final Logger log = Logger.getLogger(LocalContainerProvider.class);
+	
+	static final String REQUIRED_TYPE = "container";
+	
+	// global view of processes required
+	static final Object staticLock = new Object();
+	static final Map<String, LocalProcess> processes = new HashMap<String, LocalProcess>();
+	
+	int connectionCounter = 0;
+	
+	final String zkAddress;
+	final String clusterName;
+	final String providerName;
+	
+	public LocalContainerProvider(String zkAddress, String clusterName, String providerName) {
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.providerName = providerName;
+	}
+
+	@Override
+	public void create(String id, String type) throws Exception {
+		synchronized (staticLock) {	
+			if(processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' already exists", id));
+			
+			if(!type.equals(REQUIRED_TYPE))
+				throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+			
+			log.info(String.format("Running container '%s' (zkAddress='%s', clusterName='%s')", id, zkAddress, clusterName));
+			
+			ManagedProcess process = new ManagedProcess(clusterName, zkAddress, id);
+			process.start();
+		
+			processes.put(id, new LocalProcess(id, providerName, process));
+			
+		}
+	}
+	
+	@Override
+	public void destroy(String id) throws Exception {
+		synchronized (staticLock) {	
+			if(!processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+			
+			log.info(String.format("Destroying container '%s'", id));
+			
+			LocalProcess local = processes.remove(id);
+			
+			local.process.stop();
+		}
+	}
+	
+	@Override
+	public void destroyAll() {
+		synchronized (staticLock) {	
+			log.info("Destroying all processes");
+			for(String id : new HashSet<String>(processes.keySet())) {
+				try { destroy(id); } catch (Exception ignore) {}
+			}
+		}
+	}
+	
+	static class LocalProcess {
+		final String id;
+		final String owner;
+		final ManagedProcess process;
+		
+		public LocalProcess(String id, String owner, ManagedProcess process) {
+			this.id = id;
+			this.owner = owner;
+			this.process = process;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalProcessProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalProcessProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalProcessProvider.java
new file mode 100644
index 0000000..01e3ab6
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalProcessProvider.java
@@ -0,0 +1,100 @@
+package org.apache.helix.metamanager.managed;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.log4j.Logger;
+
+public class LocalProcessProvider implements ClusterContainerProvider {
+
+	static final Logger log = Logger.getLogger(LocalProcessProvider.class);
+	
+	static final String REQUIRED_TYPE = "container";
+	
+	Map<String, ManagedProcess> processes = new HashMap<String, ManagedProcess>();
+	Map<String, String> id2connection = new HashMap<String, String>();
+
+	int connectionCounter = 0;
+	
+	final String zkAddress;
+	final String clusterName;
+	final int basePort;
+	
+	public LocalProcessProvider(String zkAddress, String clusterName, int basePort) {
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.basePort = basePort;
+	}
+
+	@Override
+	public synchronized String create(String id, String type) throws Exception {
+		if(processes.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' already exists", id));
+		
+		if(!type.equals(REQUIRED_TYPE))
+			throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+		
+		String connection = "localhost_" + (basePort + connectionCounter);
+		connectionCounter++;
+		
+		log.info(String.format("Running container '%s' (zkAddress='%s', clusterName='%s', connection='%s')", id, zkAddress, clusterName, connection));
+		
+		ManagedProcess p = new ManagedProcess(clusterName, zkAddress, connection);
+		
+		processes.put(id, p);
+		id2connection.put(id, connection);
+		
+		return connection;
+	}
+	
+	public synchronized void start(String id) throws Exception {
+		if(!processes.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+		
+		log.info(String.format("Starting container '%s'", id));
+		
+		ManagedProcess p = processes.get(id);
+		
+		p.start();
+	}
+	
+	public synchronized void stop(String id) throws Exception {
+		if(!processes.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+		
+		log.info(String.format("Stopping container '%s'", id));
+		
+		ManagedProcess p = processes.get(id);
+		
+		p.stop();
+	}
+
+	@Override
+	public synchronized String destroy(String id) throws Exception {
+		if(!processes.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+		
+		log.info(String.format("Destroying container '%s'", id));
+		
+		String connection = id2connection.get(id);
+
+		processes.remove(id);
+		id2connection.remove(id);
+		
+		return connection;
+	}
+	
+	public synchronized void destroyAll() {
+		log.info("Destroying all processes");
+		for(String id : new HashSet<String>(processes.keySet())) {
+			try {
+				destroy(id);
+			} catch (Exception ignore) {
+				// ignore
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalStatusProvider.java
new file mode 100644
index 0000000..54f040f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/LocalStatusProvider.java
@@ -0,0 +1,22 @@
+package org.apache.helix.metamanager.managed;
+
+import org.apache.helix.metamanager.ClusterStatusProvider;
+
+public class LocalStatusProvider implements ClusterStatusProvider {
+
+	int targetContainerCount;
+	
+	public LocalStatusProvider(int targetContainerCount) {
+		this.targetContainerCount = targetContainerCount;
+	}
+
+	@Override
+	public int getTargetContainerCount(String type) {
+		return targetContainerCount;
+	}
+
+	public void setTargetContainerCount(int targetContainerCount) {
+		this.targetContainerCount = targetContainerCount;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/Managed.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/Managed.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/Managed.java
new file mode 100644
index 0000000..1e03103
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/Managed.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metamanager.managed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "SLAVE", "MASTER", "DROPPED" })
+public class Managed extends StateModel {
+	
+	static final Logger log = Logger.getLogger(Managed.class);
+	
+	@Transition(from = "OFFLINE", to = "SLAVE")
+	public void offlineToSlave(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from OFFLINE to SLAVE",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "SLAVE", to = "OFFLINE")
+	public void slaveToOffline(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from SLAVE to OFFLINE",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "SLAVE", to = "MASTER")
+	public void slaveToMaster(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from SLAVE to MASTER",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "MASTER", to = "SLAVE")
+	public void masterToSlave(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from MASTER to SLAVE",
+				context.getManager().getInstanceName()));
+	}
+
+	@Transition(from = "OFFLINE", to = "DROPPED")
+	public void offlineToDropped(Message m, NotificationContext context) {
+		log.trace(String.format("%s transitioning from OFFLINE to DROPPED",
+				context.getManager().getInstanceName()));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedFactory.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedFactory.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedFactory.java
new file mode 100644
index 0000000..f51d9c0
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedFactory.java
@@ -0,0 +1,30 @@
+package org.apache.helix.metamanager.managed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+public class ManagedFactory extends StateModelFactory<Managed> {
+
+	@Override
+	public Managed createNewStateModel(String partitionName) {
+		return new Managed();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedProcess.java
new file mode 100644
index 0000000..387c459
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ManagedProcess.java
@@ -0,0 +1,85 @@
+package org.apache.helix.metamanager.managed;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.log4j.Logger;
+
+public class ManagedProcess
+{
+	static final Logger log = Logger.getLogger(ManagedProcess.class);
+	
+  private String clusterName;
+  private String zkAddress;
+  private String instanceName;
+  private HelixManager participantManager;
+
+ public ManagedProcess(String clusterName, String zkAddress, String instanceName)
+  {
+    this.clusterName = clusterName;
+    this.zkAddress = zkAddress;
+    this.instanceName = instanceName;
+
+  }
+
+  public void start() throws Exception
+  {
+    log.info("STARTING "+ instanceName);
+    participantManager = HelixManagerFactory.getZKHelixManager(clusterName,
+        instanceName, InstanceType.PARTICIPANT, zkAddress);
+    participantManager.getStateMachineEngine().registerStateModelFactory(
+        "MasterSlave", new ManagedFactory());
+    participantManager.connect();
+    log.info("STARTED "+ instanceName);
+
+  }
+
+  public void stop()
+  {
+    if (participantManager != null)
+    {
+      participantManager.disconnect();
+    }
+  }
+
+  public static void main(String[] args) throws Exception
+  {
+    final String zkAddress = args[0];
+    final String clusterName = args[1];
+    final String instanceName = args[2];
+
+    // Give a unique id to each process, most commonly used format hostname_port
+    final ManagedProcess managerProcess = new ManagedProcess(clusterName, zkAddress,
+        instanceName);
+    Runtime.getRuntime().addShutdownHook(new Thread()
+    {
+      @Override
+      public void run()
+      {
+    	  log.info("Shutting down " + instanceName);
+        managerProcess.stop();
+      }
+    });
+    managerProcess.start();
+    Thread.currentThread().join();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellContainerProvider.java
new file mode 100644
index 0000000..107f2c6
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellContainerProvider.java
@@ -0,0 +1,85 @@
+package org.apache.helix.metamanager.managed;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.log4j.Logger;
+
+public class ShellContainerProvider implements ClusterContainerProvider {
+
+	static final Logger log = Logger.getLogger(ShellContainerProvider.class);
+	
+	static final String REQUIRED_TYPE = "container";
+	static final String RUN_COMMAND = "/bin/sh";
+	
+	// global view of processes required
+	static final Object staticLock = new Object();
+	static final Map<String, ShellProcess> processes = new HashMap<String, ShellProcess>();
+
+	final String zkAddress;
+	final String clusterName;
+	final String command;
+	final String providerName;
+	
+	public ShellContainerProvider(String zkAddress, String clusterName, String owner, String command) {
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.command = command;
+		this.providerName = owner;
+	}
+
+	@Override
+	public void create(String id, String type) throws Exception {
+		synchronized (staticLock) {
+			if(processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' already exists", id));
+			
+			if(!type.equals(REQUIRED_TYPE))
+				throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+			
+			log.info(String.format("Running container '%s' (zkAddress='%s', clusterName='%s', command='%s')", id, zkAddress, clusterName, command));
+			
+			ProcessBuilder builder = new ProcessBuilder(RUN_COMMAND, command, zkAddress, clusterName, id);
+			Process process = builder.start();
+			
+			processes.put(id, new ShellProcess(id, providerName, process));
+		}
+	}
+	
+	@Override
+	public void destroy(String id) throws Exception {
+		synchronized (staticLock) {
+			if(!processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+			
+			log.info(String.format("Destroying container '%s'", id));
+			
+			ShellProcess shell = processes.remove(id);
+			shell.process.destroy();
+		}
+	}
+	
+	@Override
+	public void destroyAll() {
+		synchronized (staticLock) {
+			log.info("Destroying all processes");
+			for(ShellProcess process : new HashSet<ShellProcess>(processes.values())) {
+				try { destroy(process.id); } catch (Exception ignore) {}
+			}
+		}
+	}
+	
+	static class ShellProcess {
+		final String id;
+		final String owner;
+		final Process process;
+
+		public ShellProcess(String id, String owner, Process process) {
+			this.id = id;
+			this.owner = owner;
+			this.process = process;
+		}		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellProcessProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellProcessProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellProcessProvider.java
new file mode 100644
index 0000000..0def0f5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/ShellProcessProvider.java
@@ -0,0 +1,148 @@
+package org.apache.helix.metamanager.managed;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.log4j.Logger;
+
+public class ShellProcessProvider implements ClusterContainerProvider {
+
+	static final Logger log = Logger.getLogger(ShellProcessProvider.class);
+	
+	static final String REQUIRED_TYPE = "container";
+	static final String RUN_COMMAND = "/bin/sh";
+	static final String KILL_COMMAND = "kill -s SIGINT %d";
+	
+	Map<String, ProcessBuilder> builders = new HashMap<String, ProcessBuilder>();
+	Map<String, Process> processes = new HashMap<String, Process>();
+	Map<String, String> id2connection = new HashMap<String, String>();
+
+	int connectionCounter = 0;
+	
+	final String zkAddress;
+	final String clusterName;
+	final int basePort;
+	final String command;
+	
+	public ShellProcessProvider(String zkAddress, String clusterName, int basePort, String command) {
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.basePort = basePort;
+		this.command = command;
+	}
+
+	@Override
+	public synchronized String create(String id, String type) throws Exception {
+		if(builders.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' already exists", id));
+		
+		if(!type.equals(REQUIRED_TYPE))
+			throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+		
+		String connection = "localhost_" + (basePort + connectionCounter);
+		connectionCounter++;
+		
+		log.info(String.format("Running container '%s' (zkAddress='%s', clusterName='%s', connection='%s', command='%s')", id, zkAddress, clusterName, connection, command));
+		
+		ProcessBuilder builder = new ProcessBuilder(RUN_COMMAND, command, zkAddress, clusterName, connection);
+		
+		builders.put(id, builder);
+		id2connection.put(id, connection);
+		
+		return connection;
+	}
+	
+	public synchronized void start(String id) throws Exception {
+		if(!builders.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+		
+		if(processes.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' already running", id));
+		
+		log.info(String.format("Starting container '%s'", id));
+		
+		Process p = builders.get(id).start();
+		
+		processes.put(id, p);
+	}
+	
+	public synchronized void stop(String id) throws Exception {
+		if(!builders.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+		
+		if(!processes.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' not running", id));
+		
+		log.info(String.format("Stopping container '%s'", id));
+		
+		Process p = processes.get(id);
+		
+		int pid = getUnixPID(p);
+		Runtime.getRuntime().exec(String.format(KILL_COMMAND, pid));
+		
+		int retVal = p.waitFor();
+		if(retVal != 130) {
+			log.warn(String.format("Process %d returned %d (should be 130, SIGINT)", pid, retVal));
+		}
+		
+		processes.remove(id);
+		
+	}
+
+	@Override
+	public synchronized String destroy(String id) throws Exception {
+		if(!builders.containsKey(id))
+			throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+		
+		log.info(String.format("Destroying container '%s'", id));
+		
+		if(processes.containsKey(id)) {
+			log.warn(String.format("Forcibly terminating running container '%s'", id));
+			processes.get(id).destroy();
+			processes.remove(id);
+		}
+		
+		String connection = id2connection.get(id);
+
+		builders.remove(id);
+		id2connection.remove(id);
+		
+		return connection;
+	}
+	
+	public synchronized void destroyAll() {
+		log.info("Destroying all processes");
+		for(String id : new HashSet<String>(processes.keySet())) {
+			try {
+				destroy(id);
+			} catch (Exception ignore) {
+				// ignore
+			}
+		}
+	}
+	
+	// TODO get PID independently of platform
+    static int getUnixPID(Process process) throws IllegalArgumentException, IllegalAccessException, NoSuchFieldException {
+        if (process.getClass().getName().equals("java.lang.UNIXProcess")) {
+            Class<?> proc = process.getClass();
+            Field field = proc.getDeclaredField("pid");
+            Object value = getFieldValue(field, process);
+            return ((Integer) value).intValue();
+        } else {
+            throw new IllegalArgumentException("Not a UNIXProcess");
+        }
+    }
+    
+    static Object getFieldValue(Field field, Object object) throws IllegalArgumentException, IllegalAccessException {
+    	Object value;
+    	boolean accessible = field.isAccessible();
+    	field.setAccessible(true);
+    	value = field.get(object);
+    	field.setAccessible(accessible);
+    	return value;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/YarnContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/YarnContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/YarnContainerProvider.java
new file mode 100644
index 0000000..629788e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/managed/YarnContainerProvider.java
@@ -0,0 +1,37 @@
+package org.apache.helix.metamanager.managed;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+
+public class YarnContainerProvider implements ClusterContainerProvider {
+
+	@Override
+	public String create(String id, String type) throws Exception {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public String destroy(String id) throws Exception {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+	@Override
+	public void start(String id) throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void stop(String id) throws Exception {
+		// TODO Auto-generated method stub
+
+	}
+
+	@Override
+	public void destroyAll() {
+		// TODO Auto-generated method stub
+
+	}
+
+}