You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:13 UTC

[04/15] Adding Helix-task-framework and Yarn integration modules

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProcess.java
new file mode 100644
index 0000000..9ea713c
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProcess.java
@@ -0,0 +1,82 @@
+package org.apache.helix.metamanager.provider;
+
+import java.util.Properties;
+
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.metamanager.ClusterAdmin;
+import org.apache.helix.metamanager.ContainerProvider;
+import org.apache.helix.metamanager.HelixClusterAdmin;
+import org.apache.helix.metamanager.Service;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Helix participant for ContainerProvider. Configurable via ProviderProperties
+ * and runnable service.
+ * 
+ */
+public class ProviderProcess implements Service {
+    static final Logger log = Logger.getLogger(ProviderProcess.class);
+
+    ClusterAdmin        admin;
+
+    ProviderProperties  properties;
+    ContainerProvider   provider;
+    HelixAdmin          helixAdmin;
+    HelixManager        participantManager;
+
+    @Override
+    public void configure(Properties properties) throws Exception {
+        Preconditions.checkNotNull(properties);
+        ProviderProperties providerProperties = new ProviderProperties();
+        providerProperties.putAll(properties);
+        Preconditions.checkArgument(providerProperties.isValid());
+
+        this.properties = providerProperties;
+
+    }
+
+    public void setConteinerProvider(ContainerProvider provider) {
+        this.provider = provider;
+    }
+
+    @Override
+    public void start() throws Exception {
+        Preconditions.checkNotNull(provider);
+
+        log.info(String.format("Registering provider '%s' at '%s/%s'", properties.getName(), properties.getMetaAddress(), properties.getMetaCluster()));
+        HelixAdmin metaHelixAdmin = new ZKHelixAdmin(properties.getMetaAddress());
+        metaHelixAdmin.addInstance(properties.getMetaCluster(), new InstanceConfig(properties.getName()));
+        metaHelixAdmin.close();
+
+        log.info(String.format("Starting provider '%s'", properties.getName()));
+        helixAdmin = new ZKHelixAdmin(properties.getAddress());
+        admin = new HelixClusterAdmin(properties.getCluster(), helixAdmin);
+
+        participantManager = HelixManagerFactory.getZKHelixManager(properties.getMetaCluster(), properties.getName(), InstanceType.PARTICIPANT,
+                properties.getMetaAddress());
+        participantManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", new ProviderStateModelFactory(provider, admin));
+        participantManager.connect();
+
+        log.info(String.format("Successfully started provider '%s'", properties.getName()));
+    }
+
+    @Override
+    public void stop() {
+        log.info(String.format("Stopping provider '%s'", properties.getName()));
+        if (participantManager != null) {
+            participantManager.disconnect();
+            participantManager = null;
+        }
+        if (helixAdmin != null) {
+            helixAdmin.close();
+            helixAdmin = null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProperties.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProperties.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProperties.java
new file mode 100644
index 0000000..098592a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderProperties.java
@@ -0,0 +1,97 @@
+package org.apache.helix.metamanager.provider;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.helix.metamanager.bootstrapper.BootUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Base configuration for {@link ProviderProcess}. 
+ *
+ */
+public class ProviderProperties extends Properties {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = -2209509977839674160L;
+	
+	public final static String ADDRESS = "address";
+	public final static String CLUSTER = "cluster";
+    public final static String METAADDRESS = "metaaddress";
+    public final static String METACLUSTER = "metacluster";
+	public final static String NAME = "name";
+	
+	public final static String CONTAINER_NAMESPACE = "containers";
+	
+	public boolean isValid() {
+		return(containsKey(ADDRESS) &&
+		       containsKey(CLUSTER) &&
+		       containsKey(METAADDRESS) &&
+               containsKey(METACLUSTER) &&
+               containsKey(NAME));
+	}
+	
+	public String getAddress() {
+		return getProperty(ADDRESS);
+	}
+	
+	public String getCluster() {
+	    return getProperty(CLUSTER);
+	}
+	
+    public String getMetaAddress() {
+        return getProperty(METAADDRESS);
+    }
+    
+    public String getMetaCluster() {
+        return getProperty(METACLUSTER);
+    }
+    
+	public String getName() {
+	    return getProperty(NAME);
+	}
+	
+	public Set<String> getContainers() {
+        if(!BootUtils.hasNamespace(this, CONTAINER_NAMESPACE))
+            return Collections.emptySet();
+	    return BootUtils.getNamespaces(BootUtils.getNamespace(this, CONTAINER_NAMESPACE));
+	}
+	
+	public boolean hasContainer(String id) {
+	    if(!BootUtils.hasNamespace(this, CONTAINER_NAMESPACE)) return false;
+	    if(!BootUtils.hasNamespace(BootUtils.getNamespace(this, CONTAINER_NAMESPACE), id)) return false;
+	    return true;
+	}
+	
+	public Properties getContainer(String id) {
+	    Preconditions.checkArgument(BootUtils.hasNamespace(this, CONTAINER_NAMESPACE), "no container namespace");
+        Preconditions.checkArgument(BootUtils.hasNamespace(BootUtils.getNamespace(this, CONTAINER_NAMESPACE), id), "container %s not configured", id);
+	    return BootUtils.getNamespace(BootUtils.getNamespace(this, CONTAINER_NAMESPACE), id);
+	}
+	
+	public void addContainer(String id, Properties properties) {
+	    Preconditions.checkArgument(!getContainers().contains(id), "Already contains container type %s", id);
+	    
+	    // add container config
+        for(Map.Entry<Object, Object> entry : properties.entrySet()) {
+            this.put(CONTAINER_NAMESPACE + "." + id + "." + entry.getKey(), entry.getValue());
+        }
+	}
+
+    @Override
+    public Object get(Object key) {
+        Preconditions.checkState(containsKey(key));
+        return super.get(key);
+    }
+    
+    @Override
+    public String getProperty(String key) {
+        Preconditions.checkState(containsKey(key));
+        return super.getProperty(key);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancer.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancer.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancer.java
new file mode 100644
index 0000000..4be1a05
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancer.java
@@ -0,0 +1,352 @@
+package org.apache.helix.metamanager.provider;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.metamanager.StatusProvider;
+import org.apache.helix.metamanager.TargetProvider;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Rebalancer for meta cluster. Polls {@link TargetProvider} and
+ * {@link StatusProvider} and reads and sets IdealState of meta cluster participants (
+ * {@link ProviderProcess}). The number of active container is set to the target
+ * count. Failed containers are shut down and restarted on any available
+ * provider. Also, container counts are balanced across multiple providers.<br/>
+ * <b>NOTE:</b> status and target provider are injected via
+ * {@link ProviderRebalancerSingleton}<br/>
+ * <br/>
+ * <b>IdealState mapping:</b><br/>
+ * resource = container type<br/>
+ * partition = logical container instance<br/>
+ * instance = container provider<br/>
+ * status = physical container instance presence<br/>
+ */
+public class ProviderRebalancer implements Rebalancer {
+
+    static final Logger log                 = Logger.getLogger(ProviderRebalancer.class);
+
+    static final long   UPDATE_INTERVAL_MIN = 1500;
+
+    static final Object lock                = new Object();
+    static long         nextUpdate          = 0;
+
+    TargetProvider      targetProvider;
+    StatusProvider      statusProvider;
+    HelixManager        manager;
+
+    @Override
+    public void init(HelixManager manager) {
+        this.targetProvider = ProviderRebalancerSingleton.getTargetProvider();
+        this.statusProvider = ProviderRebalancerSingleton.getStatusProvider();
+        this.manager = manager;
+    }
+
+    @Override
+    public ResourceAssignment computeResourceMapping(Resource resource, IdealState idealState, CurrentStateOutput currentStateOutput,
+            ClusterDataCache clusterData) {
+
+        final String resourceName = resource.getResourceName();
+        final String containerType = resourceName;
+
+        final SortedSet<String> allContainers = Sets.newTreeSet(new IndexedNameComparator());
+        allContainers.addAll(idealState.getPartitionSet());
+
+        final SortedSet<String> allProviders = Sets.newTreeSet(new IndexedNameComparator());
+        for (LiveInstance instance : clusterData.getLiveInstances().values()) {
+            allProviders.add(instance.getId());
+        }
+
+        final ResourceState currentState = new ResourceState(resourceName, currentStateOutput);
+
+        // target container count
+        log.debug(String.format("Retrieving target container count for type '%s'", containerType));
+        int targetCount = -1;
+        try {
+            targetCount = targetProvider.getTargetContainerCount(containerType);
+        } catch (Exception e) {
+            log.error(String.format("Could not retrieve target count for '%s'", containerType), e);
+            return new ResourceAssignment(resourceName);
+        }
+
+        // provider sanity check
+        if (allProviders.isEmpty()) {
+            log.warn(String.format("Could not find any providers"));
+            return new ResourceAssignment(resourceName);
+        }
+
+        // all containers
+        SortedSet<String> assignedContainers = getAssignedContainers(currentState, allContainers);
+        SortedSet<String> failedContainers = getFailedContainers(currentState, allContainers);
+
+        log.info(String.format("Rebalancing '%s' (target=%d, active=%d, failures=%d)", resourceName, targetCount, assignedContainers.size(),
+                failedContainers.size()));
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("%s: assigned containers %s", resourceName, assignedContainers));
+            log.debug(String.format("%s: failed containers %s", resourceName, failedContainers));
+        }
+
+        // assignment
+        int maxCountPerProvider = (int) Math.ceil(targetCount / (float) allProviders.size());
+
+        ResourceAssignment assignment = new ResourceAssignment(resourceName);
+        CountMap counts = new CountMap(allProviders);
+        int assignmentCount = 0;
+
+        // currently assigned
+        for (String containerName : assignedContainers) {
+            String providerName = getProvider(currentState, containerName);
+            Partition partition = new Partition(containerName);
+
+            if (failedContainers.contains(containerName)) {
+                log.warn(String.format("Container '%s:%s' failed, going offline", providerName, containerName));
+                assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "OFFLINE"));
+
+            } else if (counts.get(providerName) >= maxCountPerProvider) {
+                log.warn(String.format("Container '%s:%s' misassigned, going offline", providerName, containerName));
+                assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "OFFLINE"));
+
+            } else {
+                assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "ONLINE"));
+            }
+
+            counts.increment(providerName);
+            assignmentCount++;
+        }
+
+        // currently unassigned
+        SortedSet<String> unassignedContainers = Sets.newTreeSet(new IndexedNameComparator());
+        unassignedContainers.addAll(allContainers);
+        unassignedContainers.removeAll(assignedContainers);
+
+        for (String containerName : unassignedContainers) {
+            if (assignmentCount >= targetCount)
+                break;
+
+            String providerName = counts.getMinKey();
+            Partition partition = new Partition(containerName);
+
+            if (failedContainers.contains(containerName)) {
+                log.warn(String.format("Container '%s:%s' failed and unassigned, going offline", providerName, containerName));
+                assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "OFFLINE"));
+
+            } else {
+                assignment.addReplicaMap(partition, Collections.singletonMap(providerName, "ONLINE"));
+            }
+
+            counts.increment(providerName);
+            assignmentCount++;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("assignment counts: %s", counts));
+            log.debug(String.format("assignment: %s", assignment));
+        }
+
+        return assignment;
+    }
+
+    boolean hasProvider(ResourceState state, String containerName) {
+        Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
+        Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+        return hasInstance(currentStateMap, "ONLINE") || hasInstance(pendingStateMap, "ONLINE");
+    }
+
+    String getProvider(ResourceState state, String containerName) {
+        Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
+        if (hasInstance(currentStateMap, "ONLINE"))
+            return getInstance(currentStateMap, "ONLINE");
+
+        Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+        return getInstance(pendingStateMap, "ONLINE");
+    }
+
+    SortedSet<String> getFailedContainers(ResourceState state, Collection<String> containers) {
+        SortedSet<String> failedContainers = Sets.newTreeSet(new IndexedNameComparator());
+        for (String containerName : containers) {
+            Map<String, String> currentStateMap = state.getCurrentStateMap(containerName);
+            Map<String, String> pendingStateMap = state.getPendingStateMap(containerName);
+
+            if (hasInstance(currentStateMap, "ERROR")) {
+                failedContainers.add(containerName);
+                continue;
+            }
+
+            if (!hasInstance(currentStateMap, "ONLINE") || hasInstance(pendingStateMap, "OFFLINE"))
+                continue;
+
+            // container listed online and not in transition, but not active
+            if (!statusProvider.isHealthy(containerName)) {
+                log.warn(String.format("Container '%s' designated ONLINE, but is not active", containerName));
+                failedContainers.add(containerName);
+            }
+        }
+        return failedContainers;
+    }
+
+    SortedSet<String> getAssignedContainers(ResourceState state, Collection<String> containers) {
+        SortedSet<String> assignedContainers = Sets.newTreeSet(new IndexedNameComparator());
+        for (String containerName : containers) {
+
+            if (!hasProvider(state, containerName))
+                continue;
+
+            assignedContainers.add(containerName);
+        }
+        return assignedContainers;
+    }
+
+    boolean hasInstance(Map<String, String> stateMap, String state) {
+        if (!stateMap.isEmpty()) {
+            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+                if (entry.getValue().equals(state)) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    String getInstance(Map<String, String> stateMap, String state) {
+        if (!stateMap.isEmpty()) {
+            for (Map.Entry<String, String> entry : stateMap.entrySet()) {
+                if (entry.getValue().equals(state)) {
+                    return entry.getKey();
+                }
+            }
+        }
+        throw new IllegalArgumentException(String.format("Could not find instance with state '%s'", state));
+    }
+
+    class IndexedNameComparator implements Comparator<String> {
+        Pattern pattern = Pattern.compile("^(.*)([0-9]+)$");
+
+        @Override
+        public int compare(String o1, String o2) {
+            Matcher m1 = pattern.matcher(o1);
+            Matcher m2 = pattern.matcher(o2);
+
+            boolean find1 = m1.find();
+            boolean find2 = m2.find();
+
+            if (!find1 && !find2)
+                return o1.compareTo(o2);
+
+            if (!find1 && find2)
+                return -1;
+
+            if (find1 && !find2)
+                return 1;
+
+            String name1 = m1.group(1);
+            String name2 = m2.group(1);
+
+            int name_comp = name1.compareTo(name2);
+            if (name_comp != 0)
+                return name_comp;
+
+            int index1 = Integer.valueOf(m1.group(2));
+            int index2 = Integer.valueOf(m2.group(2));
+
+            return (int) Math.signum(index1 - index2);
+        }
+    }
+
+    class CountMap extends HashMap<String, Integer> {
+        /**
+         * 
+         */
+        private static final long serialVersionUID = 3954138748385337978L;
+
+        public CountMap(Collection<String> keys) {
+            super();
+            for (String key : keys) {
+                put(key, 0);
+            }
+        }
+
+        @Override
+        public Integer get(Object key) {
+            Preconditions.checkArgument(containsKey(key), "Key %s not found", key);
+            return super.get(key);
+        }
+
+        public int increment(String key) {
+            int newValue = get(key) + 1;
+            Preconditions.checkArgument(containsKey(key), "Key %s not found", key);
+            put(key, newValue);
+            return newValue;
+        }
+
+        public String getMinKey() {
+            Preconditions.checkState(size() > 0, "Must contain at least one item");
+
+            String minKey = null;
+            int minValue = Integer.MAX_VALUE;
+
+            for (String key : keySet()) {
+                int value = get(key);
+                if (value < minValue) {
+                    minValue = value;
+                    minKey = key;
+                }
+            }
+
+            return minKey;
+        }
+
+        public String getMaxKey() {
+            Preconditions.checkState(size() > 0, "Must contain at least one item");
+
+            String maxKey = null;
+            int maxValue = Integer.MIN_VALUE;
+
+            for (String key : keySet()) {
+                int value = get(key);
+                if (value > maxValue) {
+                    maxValue = value;
+                    maxKey = key;
+                }
+            }
+
+            return maxKey;
+        }
+    }
+
+    class ResourceState {
+        final String             resourceName;
+        final CurrentStateOutput state;
+
+        public ResourceState(String resourceName, CurrentStateOutput state) {
+            this.resourceName = resourceName;
+            this.state = state;
+        }
+
+        Map<String, String> getCurrentStateMap(String partitionName) {
+            return state.getCurrentStateMap(resourceName, new Partition(partitionName));
+        }
+
+        Map<String, String> getPendingStateMap(String partitionName) {
+            return state.getPendingStateMap(resourceName, new Partition(partitionName));
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancerSingleton.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancerSingleton.java
new file mode 100644
index 0000000..c46f5f5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderRebalancerSingleton.java
@@ -0,0 +1,38 @@
+package org.apache.helix.metamanager.provider;
+
+import org.apache.helix.metamanager.StatusProvider;
+import org.apache.helix.metamanager.TargetProvider;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility for dependency injection into ProviderRebalancer.
+ * 
+ */
+public class ProviderRebalancerSingleton {
+
+    static final Logger   log = Logger.getLogger(ProviderRebalancerSingleton.class);
+
+    static TargetProvider targetProvider;
+    static StatusProvider statusProvider;
+
+    private ProviderRebalancerSingleton() {
+        // left blank
+    }
+
+    public static TargetProvider getTargetProvider() {
+        return targetProvider;
+    }
+
+    public static void setTargetProvider(TargetProvider targetProvider) {
+        ProviderRebalancerSingleton.targetProvider = targetProvider;
+    }
+
+    public static StatusProvider getStatusProvider() {
+        return statusProvider;
+    }
+
+    public static void setStatusProvider(StatusProvider statusProvider) {
+        ProviderRebalancerSingleton.statusProvider = statusProvider;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModel.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModel.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModel.java
new file mode 100644
index 0000000..090f807
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModel.java
@@ -0,0 +1,114 @@
+package org.apache.helix.metamanager.provider;
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.metamanager.ClusterAdmin;
+import org.apache.helix.metamanager.ContainerProvider;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.apache.log4j.Logger;
+
+/**
+ * Helix state model implementation for {@link ContainerProvider}s. Updates
+ * configuration of managed Helix cluster and spawns and destroys container
+ * instances.
+ * 
+ */
+@StateModelInfo(initialState = "OFFLINE", states = { "OFFLINE", "ONLINE" })
+public class ProviderStateModel extends StateModel {
+
+    static final Logger log = Logger.getLogger(ProviderStateModel.class);
+
+    ContainerProvider   provider;
+    ClusterAdmin        admin;
+
+    public ProviderStateModel(ContainerProvider provider, ClusterAdmin admin) {
+        this.provider = provider;
+        this.admin = admin;
+    }
+
+    @Transition(from = "OFFLINE", to = "ONLINE")
+    public void acquire(Message m, NotificationContext context) throws Exception {
+        String containerType = m.getResourceName();
+        String containerId = m.getPartitionName();
+        String instanceId = context.getManager().getInstanceName();
+
+        log.trace(String.format("%s:%s transitioning from OFFLINE to ONLINE", containerId, instanceId));
+
+        bestEffortRemove(containerId);
+
+        // add instance to cluster
+        admin.addInstance(containerId, containerType);
+
+        // create container
+        provider.create(containerId, containerType);
+
+        try {
+            admin.rebalance();
+        } catch (Exception e) {
+            // ignore
+            log.warn(String.format("rebalancing cluster failed (error='%s')", e.getMessage()));
+        }
+
+        log.info(String.format("%s acquired container '%s' (type='%s')", instanceId, containerId, containerType));
+    }
+
+    @Transition(from = "ONLINE", to = "OFFLINE")
+    public void release(Message m, NotificationContext context) {
+        String containerId = m.getPartitionName();
+        String instanceId = context.getManager().getInstanceName();
+
+        log.trace(String.format("%s:%s transitioning from ONLINE to OFFLINE", containerId, instanceId));
+
+        bestEffortRemove(containerId);
+
+        try {
+            admin.rebalance();
+        } catch (Exception e) {
+            // ignore
+            log.warn(String.format("rebalancing cluster failed (error='%s')", e.getMessage()));
+        }
+
+        log.info(String.format("%s destroyed container '%s'", instanceId, containerId));
+
+    }
+
+    @Transition(from = "ERROR", to = "OFFLINE")
+    public void recover(Message m, NotificationContext context) {
+        String containerId = m.getPartitionName();
+        String instanceId = context.getManager().getInstanceName();
+
+        log.trace(String.format("%s:%s transitioning from ERROR to OFFLINE", containerId, instanceId));
+
+        release(m, context);
+    }
+
+    @Transition(from = "OFFLINE", to = "DROPPED")
+    public void drop(Message m, NotificationContext context) {
+        String containerId = m.getPartitionName();
+        String instanceId = context.getManager().getInstanceName();
+
+        log.trace(String.format("%s:%s transitioning from OFFLINE to DROPPED", containerId, instanceId));
+    }
+
+    private void bestEffortRemove(String containerId) {
+        log.debug(String.format("Best effort removal of container '%s'", containerId));
+
+        try {
+            provider.destroy(containerId);
+            log.debug(String.format("Container '%s' destroyed", containerId));
+        } catch (Exception e) {
+            log.debug(String.format("Container '%s' does not exist", containerId));
+        }
+
+        try {
+            admin.removeInstance(containerId);
+            log.debug(String.format("Instance '%s' removed", containerId));
+        } catch (Exception e) {
+            log.debug(String.format("Instance '%s' does not exist", containerId));
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModelFactory.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModelFactory.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModelFactory.java
new file mode 100644
index 0000000..36a071a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/ProviderStateModelFactory.java
@@ -0,0 +1,27 @@
+package org.apache.helix.metamanager.provider;
+
+import org.apache.helix.metamanager.ClusterAdmin;
+import org.apache.helix.metamanager.ContainerProvider;
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+/**
+ * Factory for {@link ProviderStateModel}. Injects {@link ClusterAdmin} for
+ * managed cluster and {@link ContainerProvider}.
+ * 
+ */
+class ProviderStateModelFactory extends StateModelFactory<ProviderStateModel> {
+
+    final ContainerProvider provider;
+    final ClusterAdmin      admin;
+
+    public ProviderStateModelFactory(ContainerProvider provider, ClusterAdmin admin) {
+        super();
+        this.provider = provider;
+        this.admin = admin;
+    }
+
+    @Override
+    public ProviderStateModel createNewStateModel(String partitionName) {
+        return new ProviderStateModel(provider, admin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerProvider.java
new file mode 100644
index 0000000..b63d760
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerProvider.java
@@ -0,0 +1,75 @@
+package org.apache.helix.metamanager.provider.local;
+
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.helix.metamanager.managed.ContainerProcess;
+import org.apache.helix.metamanager.provider.local.LocalContainerSingleton.LocalProcess;
+import org.apache.log4j.Logger;
+
+public class LocalContainerProvider implements ClusterContainerProvider {
+
+	static final Logger log = Logger.getLogger(LocalContainerProvider.class);
+	
+	static final String REQUIRED_TYPE = "container";
+	
+	final String zkAddress;
+	final String clusterName;
+	final String providerName;
+	
+	public LocalContainerProvider(String zkAddress, String clusterName, String providerName) {
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.providerName = providerName;
+	}
+
+	@Override
+	public void create(String id, String type) throws Exception {
+		Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+		
+		synchronized (processes) {	
+			if(processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' already exists", id));
+			
+			if(!type.equals(REQUIRED_TYPE))
+				throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+			
+			log.info(String.format("Running container '%s' (zkAddress='%s', clusterName='%s')", id, zkAddress, clusterName));
+			
+			ContainerProcess process = new ContainerProcess(clusterName, zkAddress, id);
+			process.start();
+		
+			processes.put(id, new LocalProcess(id, providerName, process));
+			
+		}
+	}
+	
+	@Override
+	public void destroy(String id) throws Exception {
+		Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+		
+		synchronized (processes) {	
+			if(!processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+			
+			log.info(String.format("Destroying container '%s'", id));
+			
+			LocalProcess local = processes.remove(id);
+			
+			local.process.stop();
+		}
+	}
+	
+	@Override
+	public void destroyAll() {
+		Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+		
+		synchronized (processes) {	
+			log.info("Destroying all processes");
+			for(String id : new HashSet<String>(processes.keySet())) {
+				try { destroy(id); } catch (Exception ignore) {}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerSingleton.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerSingleton.java
new file mode 100644
index 0000000..d25d3ba
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerSingleton.java
@@ -0,0 +1,40 @@
+package org.apache.helix.metamanager.provider.local;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.metamanager.managed.ContainerProcess;
+
+public class LocalContainerSingleton {
+	final static Map<String, LocalProcess> processes = new HashMap<String, LocalProcess>();
+
+	private LocalContainerSingleton() {
+		// left blank
+	}
+	
+	public static Map<String, LocalProcess> getProcesses() {
+		return processes;
+	}
+	
+	public static void reset() {
+		synchronized (processes) {
+			for(LocalProcess local : processes.values()) {
+				local.process.stop();
+			}
+			processes.clear();
+		}
+	}
+	
+	static class LocalProcess {
+		final String id;
+		final String owner;
+		final ContainerProcess process;
+		
+		public LocalProcess(String id, String owner, ContainerProcess process) {
+			this.id = id;
+			this.owner = owner;
+			this.process = process;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerStatusProvider.java
new file mode 100644
index 0000000..383a0d7
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/local/LocalContainerStatusProvider.java
@@ -0,0 +1,37 @@
+package org.apache.helix.metamanager.provider.local;
+
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerStatusProvider;
+import org.apache.helix.metamanager.provider.local.LocalContainerSingleton.LocalProcess;
+
+public class LocalContainerStatusProvider implements ClusterContainerStatusProvider {
+
+	@Override
+	public boolean exists(String id) {
+		Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+		
+		synchronized (processes) {
+			return processes.containsKey(id);
+		}
+	}
+
+	@Override
+	public boolean isActive(String id) {
+		Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+		
+		synchronized (processes) {
+			return processes.get(id).process != null;
+		}
+	}
+
+	@Override
+	public boolean isFailed(String id) {
+		Map<String, LocalProcess> processes = LocalContainerSingleton.getProcesses();
+		
+		synchronized (processes) {
+			return processes.get(id).process == null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerProvider.java
new file mode 100644
index 0000000..eef730a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerProvider.java
@@ -0,0 +1,81 @@
+package org.apache.helix.metamanager.provider.shell;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.helix.metamanager.provider.shell.ShellContainerSingleton.ShellProcess;
+import org.apache.log4j.Logger;
+
+public class ShellContainerProvider implements ClusterContainerProvider {
+
+	static final Logger log = Logger.getLogger(ShellContainerProvider.class);
+	
+	static final String REQUIRED_TYPE = "container";
+	static final String RUN_COMMAND = "/bin/sh";
+	
+	// global view of processes required
+	static final Object staticLock = new Object();
+	static final Map<String, ShellProcess> processes = new HashMap<String, ShellProcess>();
+
+	final String zkAddress;
+	final String clusterName;
+	final String command;
+	final String providerName;
+	
+	public ShellContainerProvider(String zkAddress, String clusterName, String providerName, String command) {
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.command = command;
+		this.providerName = providerName;
+	}
+
+	@Override
+	public void create(String id, String type) throws Exception {
+		Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+		
+		synchronized (processes) {
+			if(processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' already exists", id));
+			
+			if(!type.equals(REQUIRED_TYPE))
+				throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+			
+			log.info(String.format("Running container '%s' (zkAddress='%s', clusterName='%s', command='%s')", id, zkAddress, clusterName, command));
+			
+			ProcessBuilder builder = new ProcessBuilder(RUN_COMMAND, command, zkAddress, clusterName, id);
+			Process process = builder.start();
+			
+			processes.put(id, new ShellProcess(id, providerName, process));
+		}
+	}
+	
+	@Override
+	public void destroy(String id) throws Exception {
+		Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+		
+		synchronized (processes) {
+			if(!processes.containsKey(id))
+				throw new IllegalArgumentException(String.format("Process '%s' does not exists", id));
+			
+			log.info(String.format("Destroying container '%s'", id));
+			
+			ShellProcess shell = processes.remove(id);
+			shell.process.destroy();
+			shell.process.waitFor();
+		}
+	}
+	
+	@Override
+	public void destroyAll() {
+		Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+		
+		synchronized (processes) {
+			log.info("Destroying all processes");
+			for(ShellProcess process : new HashSet<ShellProcess>(processes.values())) {
+				try { destroy(process.id); } catch (Exception ignore) {}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerSingleton.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerSingleton.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerSingleton.java
new file mode 100644
index 0000000..ae7f3c1
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerSingleton.java
@@ -0,0 +1,38 @@
+package org.apache.helix.metamanager.provider.shell;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ShellContainerSingleton {
+	static final Map<String, ShellProcess> processes = new HashMap<String, ShellProcess>();
+
+	private ShellContainerSingleton() {
+		// left blank
+	}
+	
+	public static Map<String, ShellProcess> getProcesses() {
+		return processes;
+	}
+	
+	public static void reset() {
+		synchronized (processes) {
+			for(ShellProcess local : processes.values()) {
+				local.process.destroy();
+				try { local.process.waitFor(); } catch(Exception ignore) {}
+			}
+			processes.clear();
+		}
+	}
+	
+	static class ShellProcess {
+		final String id;
+		final String owner;
+		final Process process;
+
+		public ShellProcess(String id, String owner, Process process) {
+			this.id = id;
+			this.owner = owner;
+			this.process = process;
+		}		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerStatusProvider.java
new file mode 100644
index 0000000..0030c2d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/shell/ShellContainerStatusProvider.java
@@ -0,0 +1,52 @@
+package org.apache.helix.metamanager.provider.shell;
+
+import java.util.Map;
+
+import org.apache.helix.metamanager.ClusterContainerStatusProvider;
+import org.apache.helix.metamanager.provider.shell.ShellContainerSingleton.ShellProcess;
+
+public class ShellContainerStatusProvider implements ClusterContainerStatusProvider {
+
+	@Override
+	public boolean exists(String id) {
+		Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+		synchronized (processes) {
+			return processes.containsKey(id);
+		}
+	}
+
+	@Override
+	public boolean isActive(String id) {
+		Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+		synchronized (processes) {
+			ShellProcess shell = processes.get(id);
+			
+			try {
+				shell.process.exitValue();
+				return false;
+			} catch (IllegalThreadStateException e) {
+				// still running
+				return true;
+			}
+		}
+	}
+
+	@Override
+	public boolean isFailed(String id) {
+		Map<String, ShellProcess> processes = ShellContainerSingleton.getProcesses();
+
+		synchronized (processes) {
+			ShellProcess shell = processes.get(id);
+			
+			try {
+				return (shell.process.exitValue() != 0);
+			} catch (IllegalThreadStateException e) {
+				// still running
+				return false;
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ApplicationConfig.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ApplicationConfig.java
new file mode 100644
index 0000000..4c8f303
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ApplicationConfig.java
@@ -0,0 +1,32 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+public class ApplicationConfig {
+	final String clusterAddress;
+	final String clusterName;
+	final String metadataAddress;
+	final String providerName;
+
+	public ApplicationConfig(String clusterAddress, String clusterName,
+			String metadataAddress, String providerName) {
+		this.clusterAddress = clusterAddress;
+		this.clusterName = clusterName;
+		this.metadataAddress = metadataAddress;
+		this.providerName = providerName;
+	}
+
+	public String getClusterAddress() {
+		return clusterAddress;
+	}
+
+	public String getClusterName() {
+		return clusterName;
+	}
+
+	public String getMetadataAddress() {
+		return metadataAddress;
+	}
+
+	public String getProviderName() {
+		return providerName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ContainerMetadata.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ContainerMetadata.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ContainerMetadata.java
new file mode 100644
index 0000000..73d1a1b
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ContainerMetadata.java
@@ -0,0 +1,50 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+
+class ContainerMetadata {
+
+	static enum ContainerState {
+		ACQUIRE,
+		CONNECTING,
+		ACTIVE,
+		TEARDOWN,
+		FAILED,
+		HALTED,
+		FINALIZE
+	}
+	
+	String id;
+	ContainerState state;
+	int yarnId;
+	String command;
+	String owner;
+
+	public ContainerMetadata() {
+		// left blank
+	}
+	
+	public ContainerMetadata(String id, String command, String owner) {
+		this.id = id;
+		this.state = ContainerState.ACQUIRE;
+		this.yarnId = -1;
+		this.command = command;
+		this.owner = owner;
+	}
+	
+	public ContainerMetadata(ContainerMetadata node, ContainerState state) {
+		this.id = node.id;
+		this.state = state;
+		this.yarnId = node.yarnId;
+		this.command = node.command;
+		this.owner = node.owner;
+	}
+	
+	public ContainerMetadata(ContainerMetadata node, ContainerState state, int yarnId) {
+		this.id = node.id;
+		this.state = state;
+		this.yarnId = yarnId;
+		this.command = node.command;
+		this.owner = node.owner;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/MetadataService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/MetadataService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/MetadataService.java
new file mode 100644
index 0000000..dc6c060
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/MetadataService.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.util.Collection;
+
+public interface MetadataService {
+
+	public boolean exists(String id);
+
+	public void create(ContainerMetadata meta) throws MetadataServiceException;
+
+	public ContainerMetadata read(String id) throws MetadataServiceException;
+
+	public Collection<ContainerMetadata> readAll() throws MetadataServiceException;
+
+	public void update(ContainerMetadata meta) throws MetadataServiceException;
+
+	public void delete(String id) throws MetadataServiceException;
+
+	public static class MetadataServiceException extends Exception {
+
+		/**
+		 * 
+		 */
+		private static final long serialVersionUID = -2846997013918977056L;
+
+		public MetadataServiceException() {
+			super();
+		}
+
+		public MetadataServiceException(String message, Throwable cause) {
+			super(message, cause);
+		}
+
+		public MetadataServiceException(String message) {
+			super(message);
+		}
+
+		public MetadataServiceException(Throwable cause) {
+			super(cause);
+		}	
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/Utils.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/Utils.java
new file mode 100644
index 0000000..82871f1
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/Utils.java
@@ -0,0 +1,94 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+public class Utils {
+	
+	static final Logger log = Logger.getLogger(Utils.class);
+	
+	static Gson gson;
+	static {
+		GsonBuilder builder = new GsonBuilder();
+		builder.registerTypeAdapter(ContainerState.class, new ContainerStateAdapter());
+		builder.setPrettyPrinting();
+		gson = builder.create();
+	}
+	
+	static Map<String, LocalResource>  dummyResources = createDummyResources();
+	
+	static String toJson(ContainerMetadata meta) {
+		return gson.toJson(meta);
+	}
+	
+	static ContainerMetadata fromJson(String json) {
+		return gson.fromJson(json, ContainerMetadata.class);
+	}
+	
+	static Map<String, LocalResource> getDummyResources() {
+		return dummyResources;
+	}
+
+	private static Map<String, LocalResource> createDummyResources() {
+		File dummy = new File("/tmp/dummy");
+		
+		if(!dummy.exists()) {
+	    	try {
+	    		dummy.createNewFile();
+	    	} catch(Exception e) {
+	    		log.error("could not create dummy file", e);
+	    		System.exit(1);
+	    	}
+		}
+	    
+	    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+	    Path path = new Path(dummy.toURI());
+	    LocalResource localResource = Records.newRecord(LocalResource.class);
+	    localResource.setType(LocalResourceType.FILE);
+	    localResource.setVisibility(LocalResourceVisibility.APPLICATION);          
+	    localResource.setResource(ConverterUtils.getYarnUrlFromPath(path)); 
+	    localResource.setTimestamp(dummy.lastModified());
+	    localResource.setSize(dummy.length());
+	    localResources.put("dummy", localResource);
+		return localResources;
+	}
+	
+	static class ContainerStateAdapter extends TypeAdapter<ContainerState> {
+		@Override
+		public ContainerState read(JsonReader reader) throws IOException {
+			if (reader.peek() == JsonToken.NULL) {
+				reader.nextNull();
+				return null;
+			}
+			return ContainerState.valueOf(reader.nextString());
+		}
+
+		@Override
+		public void write(JsonWriter writer, ContainerState value) throws IOException {
+			if (value == null) {
+				writer.nullValue();
+				return;
+			}
+			writer.value(value.name());
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnApplication.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnApplication.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnApplication.java
new file mode 100644
index 0000000..c4f3668
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnApplication.java
@@ -0,0 +1,125 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+public class YarnApplication {
+
+	static final Logger log = Logger.getLogger(YarnApplication.class);
+	
+	static final String ENV_CLUSTER_ADDRESS = "YA_CLUSTER_ADDRESS";
+	static final String ENV_CLUSTER_NAME = "YA_CLUSTER_NAME";
+	static final String ENV_METADATA_ADDRESS = "YA_METADATA_ADDRESS";
+	static final String ENV_PROVIDER_NAME = "YA_PROVIDER_NAME";
+
+	static final String MASTER_COMMAND = "/bin/sh /home/apucher/incubator-helix/recipes/meta-cluster-manager/target/meta-cluster-manager-pkg/bin/yarn-master-process.sh 1>%s/stdout 2>%s/stderr";
+
+	Configuration conf;
+	YarnRPC rpc;
+	ClientRMProtocol rmClient;
+	ApplicationId appId;
+	
+	final ApplicationConfig appConfig;
+
+	public YarnApplication(ApplicationConfig appConfig) {
+		this.appConfig = appConfig;
+		configure(new YarnConfiguration());
+	}
+
+	public void start() throws Exception {
+		connect();
+		
+		String command = String.format(MASTER_COMMAND, "/tmp/" + appConfig.providerName, "/tmp/" + appConfig.providerName); 
+				//ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+		log.info(String.format("Starting application '%s' provider '%s' (masterCommand='%s')", appConfig.metadataAddress, appConfig.providerName, command));
+
+		// app id
+		GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class);
+		GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest);
+
+		this.appId = appResponse.getApplicationId();
+
+		log.info(String.format("Acquired app id '%s' for '%s'", appId.toString(), appConfig.providerName));
+		
+		// command
+		ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class);
+		launchContext.setCommands(Collections.singletonList(command));
+
+		// resource limit
+		Resource resource = Records.newRecord(Resource.class);
+		resource.setMemory(256); // TODO make dynamic
+		launchContext.setResource(resource);
+		
+	    // environment
+	    Map<String, String> env = new HashMap<String, String>();
+	    env.put(ENV_CLUSTER_ADDRESS, appConfig.clusterAddress);
+	    env.put(ENV_CLUSTER_NAME, appConfig.clusterName);
+	    env.put(ENV_METADATA_ADDRESS, appConfig.metadataAddress);
+	    env.put(ENV_PROVIDER_NAME, appConfig.providerName);
+	    launchContext.setEnvironment(env);
+	    
+	    // local resources
+	    // YARN workaround: create dummy resource 
+	    Map<String, LocalResource> localResources = Utils.getDummyResources();
+	    launchContext.setLocalResources(localResources);
+	    
+	    // app submission
+	    ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class);
+		subContext.setApplicationId(appId);
+		subContext.setApplicationName(appConfig.providerName);
+		subContext.setAMContainerSpec(launchContext);
+
+		SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class);
+		subRequest.setApplicationSubmissionContext(subContext);
+		
+		log.info(String.format("Starting app id '%s'", appId.toString()));
+
+		rmClient.submitApplication(subRequest);
+		
+	}
+
+	public void stop() throws YarnRemoteException {
+		log.info(String.format("Stopping app id '%s'", appId.toString()));
+		KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
+		killRequest.setApplicationId(appId);
+
+		rmClient.forceKillApplication(killRequest);
+	}
+
+	void configure(Configuration conf) {
+		this.conf = Preconditions.checkNotNull(conf);
+		this.rpc = YarnRPC.create(conf);
+	}
+
+	void connect() {
+		YarnConfiguration yarnConf = new YarnConfiguration(conf);
+		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+				YarnConfiguration.RM_ADDRESS,
+				YarnConfiguration.DEFAULT_RM_ADDRESS));
+		log.info("Connecting to ResourceManager at: " + rmAddress);
+		this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProcess.java
new file mode 100644
index 0000000..0d997bf
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProcess.java
@@ -0,0 +1,60 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.log4j.Logger;
+
+public class YarnContainerProcess {
+	static final Logger log = Logger.getLogger(YarnContainerProcess.class);
+
+	public static void main(String[] args) throws Exception {
+		log.trace("BEGIN YarnProcess.main()");
+		  
+		final String clusterAddress = args[0];
+		final String clusterName = args[1];
+		final String metadataAddress = args[2];
+		final String providerName = args[3];
+		final String containerId = args[4];
+		
+		final ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, metadataAddress, providerName);
+		
+		log.debug("Launching metadata service");
+		final ZookeeperMetadataService metaService = new ZookeeperMetadataService(metadataAddress);
+		metaService.startService();
+		
+		log.debug("Launching yarn container service");
+		final YarnContainerService yarnProcess = new YarnContainerService(appConfig, metaService, containerId);
+		yarnProcess.startService();
+		
+		log.debug("Installing shutdown hooks");
+		Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+			@Override
+			public void run() {
+				yarnProcess.stopService();
+				metaService.stopService();
+			}
+		}));
+		
+		System.out.println("Press ENTER to stop container process");
+		System.in.read();
+		
+		log.trace("END YarnProcess.main()");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProvider.java
new file mode 100644
index 0000000..9f09d46
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerProvider.java
@@ -0,0 +1,108 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.util.concurrent.TimeoutException;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.provider.yarn.MetadataService.MetadataServiceException;
+import org.apache.log4j.Logger;
+
+public class YarnContainerProvider implements ClusterContainerProvider {
+	
+	static final Logger log = Logger.getLogger(YarnContainerProvider.class);
+
+	static final long POLL_INTERVAL = 1000;
+	
+	static final String REQUIRED_TYPE = "container";
+	
+	static final long CONTAINER_TIMEOUT = 10000;
+	
+	/*
+	 * CONTAINERS
+	 *   A (A, READY)
+	 *   B (B, RUNNING)
+	 */
+	
+	final ApplicationConfig appConfig;
+	final String command;
+	
+	final Object notifier = new Object();
+	
+	ZookeeperMetadataService metaService;
+	
+	public YarnContainerProvider(ApplicationConfig appConfig, String command) {
+		this.appConfig = appConfig;
+		this.command = command;
+	}
+
+	@Override
+	public void create(final String id, final String type) throws Exception {
+		if(!REQUIRED_TYPE.equals(type)) {
+			throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+		}
+		
+		metaService.create(new ContainerMetadata(id, command, appConfig.providerName));
+		waitForState(id, ContainerState.ACTIVE);
+	}
+
+	@Override
+	public void destroy(final String id) throws Exception {
+		ContainerMetadata meta = metaService.read(id);
+
+		if(meta.state == ContainerState.ACTIVE) {
+			log.info(String.format("Destroying active container, going to teardown"));
+			metaService.update(new ContainerMetadata(meta, ContainerState.TEARDOWN));
+			
+		} else if(meta.state == ContainerState.FAILED) {
+			log.info(String.format("Destroying failed container, going to halted"));
+			metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+			
+		} else if(meta.state == ContainerState.FINALIZE) {
+			log.info(String.format("Destroying finalized container, skipping"));
+			
+		} else {
+			throw new IllegalStateException(String.format("Container '%s' must be active, failed or finalized", id));
+		}
+		
+		waitForState(id, ContainerState.FINALIZE);
+		metaService.delete(id);
+	}
+
+	@Override
+	public void destroyAll() {
+		try {
+			for(ContainerMetadata meta : metaService.readAll()) {
+				try { destroy(meta.id); } catch (Exception ignore) {}
+			}
+		} catch (Exception ignore) {
+			// ignore
+		}
+	}
+
+	public void startService() {
+		log.debug("Starting yarn container provider service");
+		metaService = new ZookeeperMetadataService(appConfig.metadataAddress);
+		metaService.startService();
+	}
+	
+	public void stopService() {
+		log.debug("Stopping yarn container provider service");
+		if(metaService != null) {
+			metaService.stopService();
+			metaService = null;
+		}
+	}
+	
+	void waitForState(String id, ContainerState state) throws MetadataServiceException, InterruptedException, TimeoutException {
+		long limit = System.currentTimeMillis() + CONTAINER_TIMEOUT;
+		ContainerMetadata meta = metaService.read(id);
+		while(meta.state != state) {
+			if(System.currentTimeMillis() >= limit) {
+				throw new TimeoutException(String.format("Container '%s' failed to reach state '%s' (currently is '%s')", id, state, meta.state));
+			}
+			Thread.sleep(POLL_INTERVAL);
+			meta = metaService.read(id);
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerService.java
new file mode 100644
index 0000000..8abd8df
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerService.java
@@ -0,0 +1,129 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.managed.ManagedFactory;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnContainerService {
+	static final Logger log = Logger.getLogger(YarnContainerService.class);
+
+	static final long CONTAINERSERVICE_INTERVAL = 1000;
+
+	final ApplicationConfig appConfig;
+	final String containerId;
+	
+	HelixManager participantManager;
+
+	MetadataService metaService;
+	ScheduledExecutorService executor;
+
+	public YarnContainerService(ApplicationConfig appConfig, MetadataService metaService, String containerId) {
+		this.appConfig = appConfig;
+		this.metaService = metaService;
+		this.containerId = containerId;
+	}
+
+	public void startService() {
+		log.debug("starting yarn container service");
+
+		executor = Executors.newSingleThreadScheduledExecutor();
+		executor.scheduleAtFixedRate(new ContainerService(), 0, CONTAINERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+	}
+
+	public void stopService() {
+		log.debug("stopping yarn container service");
+		
+		if(executor != null) {
+			executor.shutdown();
+			while(!executor.isTerminated()) {
+				try {
+					Thread.sleep(100);
+				} catch (InterruptedException e) {
+					// ignore
+				}
+			}
+			executor = null;
+		}
+	}
+	
+	public void startParticipant() throws Exception {
+		log.info("STARTING " + containerId);
+		participantManager = HelixManagerFactory.getZKHelixManager(appConfig.clusterName,
+				containerId, InstanceType.PARTICIPANT, appConfig.clusterAddress);
+		participantManager.getStateMachineEngine().registerStateModelFactory(
+				"MasterSlave", new ManagedFactory());
+		participantManager.connect();
+		log.info("STARTED " + containerId);
+	}
+
+	public void stopParticipant() {
+		if (participantManager != null) {
+			participantManager.disconnect();
+			participantManager = null;
+		}
+	}
+	
+	class ContainerService implements Runnable {
+		@Override
+		public void run() {
+			log.info("updating container status");
+			
+			try {
+				ContainerMetadata meta = metaService.read(containerId);
+				
+				if(meta.state == ContainerState.CONNECTING) {
+					log.info("container connecting, going to active");
+					try {
+						startParticipant();
+						metaService.update(new ContainerMetadata(meta, ContainerState.ACTIVE));
+					} catch (Exception e) {
+						log.error("Failed to start participant, going to failed", e);
+						stopParticipant();
+						metaService.update(new ContainerMetadata(meta, ContainerState.FAILED));
+					}
+				}
+				
+				if(meta.state == ContainerState.ACTIVE) {
+					// do something
+					// and go to failed on error
+				}
+				
+				if(meta.state == ContainerState.TEARDOWN) {
+					log.info("container teardown, going to halted");
+					stopParticipant();
+					metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+				}
+				
+			} catch(Exception e) {
+				log.error(String.format("Error while updating container '%s' status", containerId), e);
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerStatusProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerStatusProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerStatusProvider.java
new file mode 100644
index 0000000..54aa3da
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnContainerStatusProvider.java
@@ -0,0 +1,52 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import org.apache.helix.metamanager.ClusterContainerStatusProvider;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.provider.yarn.MetadataService.MetadataServiceException;
+
+public class YarnContainerStatusProvider implements ClusterContainerStatusProvider {
+
+	final String metadataAddress;
+	
+	ZookeeperMetadataService metaService;
+	
+	public YarnContainerStatusProvider(String metadataAddress) {
+		this.metadataAddress = metadataAddress;
+		this.metaService = new ZookeeperMetadataService(metadataAddress);
+	}
+
+	@Override
+	public boolean exists(String id) {
+		return metaService.exists(id);
+	}
+
+	@Override
+	public boolean isActive(String id) {
+		try {
+			return metaService.read(id).state == ContainerState.ACTIVE;
+		} catch (MetadataServiceException e) {
+			return false;
+		}
+	}
+
+	@Override
+	public boolean isFailed(String id) {
+		try {
+			return metaService.read(id).state == ContainerState.FAILED;
+		} catch (Exception e) {
+			return false;
+		}
+	}
+
+	public void startService() {
+		metaService = new ZookeeperMetadataService(metadataAddress);
+		metaService.startService();
+	}
+	
+	public void stopService() {
+		if(metaService != null) {
+			metaService.stopService();
+			metaService = null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMaster.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMaster.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMaster.java
new file mode 100644
index 0000000..f43bb67
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMaster.java
@@ -0,0 +1,134 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+public class YarnMaster extends Configured implements Tool {
+
+	static final Logger log = Logger.getLogger(YarnMaster.class);
+	
+	AMRMProtocol resourceManager;
+	ApplicationAttemptId appAttemptId;
+	
+	YarnMasterService service;
+	
+	@Override
+	public int run(String[] args) throws Exception {
+		log.trace("BEGIN YarnMaster.run()");
+			
+		Configuration conf = getConf();
+		
+		this.appAttemptId = getApplicationAttemptId();
+		log.info(String.format("Got application attempt id '%s'", appAttemptId.toString()));
+		
+		log.debug("Getting resource manager");
+		this.resourceManager = getResourceManager(conf);
+
+	    // register the AM with the RM
+		log.debug("Registering application master");
+	    RegisterApplicationMasterRequest appMasterRequest = 
+	        Records.newRecord(RegisterApplicationMasterRequest.class);
+	    appMasterRequest.setApplicationAttemptId(appAttemptId);     
+	    appMasterRequest.setHost("");
+	    appMasterRequest.setRpcPort(0);
+	    appMasterRequest.setTrackingUrl("");
+
+	    resourceManager.registerApplicationMaster(appMasterRequest);
+
+	    String clusterAddress = getEnv(YarnApplication.ENV_CLUSTER_ADDRESS);
+	    String clusterName = getEnv(YarnApplication.ENV_CLUSTER_NAME);
+	    String metadataAddress = getEnv(YarnApplication.ENV_METADATA_ADDRESS);
+	    String providerName = getEnv(YarnApplication.ENV_PROVIDER_NAME);
+	    ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, metadataAddress, providerName);
+	    
+	    service = new YarnMasterService(resourceManager, conf, appAttemptId, appConfig);
+	    service.startService();
+	    
+	    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+	    	@Override
+	    	public void run() {
+
+	    		service.stopService();
+	    		
+	    		// finish application
+	    	    log.debug("Sending finish request");
+	    	    FinishApplicationMasterRequest finishReq = 
+	    	    	Records.newRecord(FinishApplicationMasterRequest.class);
+	    	    
+	    	    finishReq.setAppAttemptId(getApplicationAttemptId());
+	    	    finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+	    	    
+	    	    try { resourceManager.finishApplicationMaster(finishReq); } catch(Exception ignore) {}
+	    	}
+	    }));
+	    
+	    try { Thread.currentThread().join(); } catch(Exception ignore) {}
+	    
+		log.trace("END YarnMaster.run()");
+		
+		return 0;
+	}
+
+	private AMRMProtocol getResourceManager(Configuration conf) {
+		// Connect to the Scheduler of the ResourceManager.
+	    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+	    YarnRPC rpc = YarnRPC.create(yarnConf);
+	    InetSocketAddress rmAddress = 
+	        NetUtils.createSocketAddr(yarnConf.get(
+	            YarnConfiguration.RM_SCHEDULER_ADDRESS,
+	            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));           
+	    log.info("Connecting to ResourceManager at " + rmAddress);
+	    AMRMProtocol resourceManager = 
+	        (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
+		return resourceManager;
+	}
+
+	private ApplicationAttemptId getApplicationAttemptId() {
+	    ContainerId containerId = ConverterUtils.toContainerId(getEnv(ApplicationConstants.AM_CONTAINER_ID_ENV));
+	    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+		return appAttemptID;
+	}
+	
+	private String getEnv(String key) {
+		Map<String, String> envs = System.getenv();
+	    String clusterName = envs.get(key);
+	    if (clusterName == null) {
+	      // container id should always be set in the env by the framework 
+	      throw new IllegalArgumentException(
+	          String.format("%s not set in the environment", key));
+	    }
+	    return clusterName;
+	}
+
+	public static void main(String[] args) throws Exception {
+		log.trace("BEGIN YarnMaster.main()");
+
+		try {
+			int rc = ToolRunner.run(new Configuration(), new YarnMaster(), args);
+			System.exit(rc);
+		} catch (Exception e) {
+			System.err.println(e);
+			System.exit(1);
+		}
+
+		log.trace("END YarnMaster.main()");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterProcess.java
new file mode 100644
index 0000000..bd4fb3d
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterProcess.java
@@ -0,0 +1,119 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+public class YarnMasterProcess {
+
+	static final Logger log = Logger.getLogger(YarnMasterProcess.class);
+	
+	public static void main(String[] args) throws Exception {
+		log.trace("BEGIN YarnMaster.main()");
+
+		final ApplicationAttemptId appAttemptId = getApplicationAttemptId();
+		log.info(String.format("Got application attempt id '%s'", appAttemptId.toString()));
+		
+		log.debug("Connecting to resource manager");
+		Configuration conf = new YarnConfiguration();
+		
+		final AMRMProtocol resourceManager = getResourceManager(conf);
+
+	    // register the AM with the RM
+		log.debug("Registering application master");
+	    RegisterApplicationMasterRequest appMasterRequest = 
+	        Records.newRecord(RegisterApplicationMasterRequest.class);
+	    appMasterRequest.setApplicationAttemptId(appAttemptId);     
+	    appMasterRequest.setHost("");
+	    appMasterRequest.setRpcPort(0);
+	    appMasterRequest.setTrackingUrl("");
+
+	    resourceManager.registerApplicationMaster(appMasterRequest);
+
+	    String clusterAddress = getEnv(YarnApplication.ENV_CLUSTER_ADDRESS);
+	    String clusterName = getEnv(YarnApplication.ENV_CLUSTER_NAME);
+	    String metadataAddress = getEnv(YarnApplication.ENV_METADATA_ADDRESS);
+	    String providerName = getEnv(YarnApplication.ENV_PROVIDER_NAME);
+	    ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, metadataAddress, providerName);
+	    
+		log.debug("Launching metadata service");
+	    final ZookeeperMetadataService metaService = new ZookeeperMetadataService(metadataAddress);
+	    metaService.startService();
+	    
+		log.debug("Launching yarn master service");
+	    final YarnMasterService service = new YarnMasterService(resourceManager, conf, appAttemptId, appConfig, metaService);
+	    service.startService();
+	    
+		log.debug("Installing shutdown hooks");
+	    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+	    	@Override
+	    	public void run() {
+
+	    		service.stopService();
+	    		
+	    		metaService.stopService();
+	    		
+	    		// finish application
+	    	    log.debug("Sending finish request");
+	    	    FinishApplicationMasterRequest finishReq = 
+	    	    	Records.newRecord(FinishApplicationMasterRequest.class);
+	    	    
+	    	    finishReq.setAppAttemptId(getApplicationAttemptId());
+	    	    finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+	    	    
+	    	    try { resourceManager.finishApplicationMaster(finishReq); } catch(Exception ignore) {}
+	    	}
+	    }));
+	    
+	    System.out.println("Press ENTER to stop master service");
+	    System.in.read();
+	    
+		log.trace("END YarnMaster.main()");
+	}
+	
+	static AMRMProtocol getResourceManager(Configuration conf) {
+		// Connect to the Scheduler of the ResourceManager.
+	    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+	    YarnRPC rpc = YarnRPC.create(yarnConf);
+	    InetSocketAddress rmAddress = 
+	        NetUtils.createSocketAddr(yarnConf.get(
+	            YarnConfiguration.RM_SCHEDULER_ADDRESS,
+	            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+	    log.info("Connecting to ResourceManager at " + rmAddress);
+	    AMRMProtocol resourceManager = 
+	        (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
+		return resourceManager;
+	}
+
+	static ApplicationAttemptId getApplicationAttemptId() {
+	    ContainerId containerId = ConverterUtils.toContainerId(getEnv(ApplicationConstants.AM_CONTAINER_ID_ENV));
+	    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+		return appAttemptID;
+	}
+	
+	static String getEnv(String key) {
+		Map<String, String> envs = System.getenv();
+	    String clusterName = envs.get(key);
+	    if (clusterName == null) {
+	      // container id should always be set in the env by the framework 
+	      throw new IllegalArgumentException(
+	          String.format("%s not set in the environment", key));
+	    }
+	    return clusterName;
+	}
+
+}