You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:12 UTC

[03/15] Adding Helix-task-framework and Yarn integration modules

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java
new file mode 100644
index 0000000..b63be1f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java
@@ -0,0 +1,361 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.provider.yarn.MetadataService.MetadataServiceException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+public class YarnMasterService {
+
+	static final Logger log = Logger.getLogger(YarnMasterService.class);
+
+	static final String REQUIRED_TYPE = "container";
+	
+	static final long ZOOKEEPER_TIMEOUT = 5000;
+	
+	static final long MASTERSERVICE_INTERVAL = 1000;
+	
+	static final String CONTAINERS = "CONTAINERS";
+	
+	static final String CONTAINER_COMMAND = "/bin/sh %s %s %s %s %s %s 1>%s/stdout 2>%s/stderr";
+
+	/*
+	 * CONTAINERS
+	 *   A (A, READY)
+	 *   B (B, RUNNING)
+	 */
+	
+	final ApplicationConfig appConfig;
+	final AMRMProtocol yarnClient;
+	final ApplicationAttemptId appAtemptId;
+	
+	final Configuration yarnConfig;
+	
+	final File dummy = new File("/tmp/dummy");
+	
+	final Map<ContainerId, Container> unassignedContainers = new HashMap<ContainerId, Container>();
+	final Map<ContainerId, Container> activeContainers = new HashMap<ContainerId, Container>();
+	final Map<ContainerId, ContainerStatus> completedContainers = new HashMap<ContainerId, ContainerStatus>();
+	final Map<ContainerId, String> yarn2meta = new HashMap<ContainerId, String>();
+	
+	final MetadataService metaService;
+	
+	ScheduledExecutorService executor;
+
+	public YarnMasterService(AMRMProtocol yarnClient, Configuration conf, ApplicationAttemptId appAttemptId, ApplicationConfig appConfig, MetadataService metaService) {
+		this.appConfig = appConfig;
+		this.yarnClient = yarnClient;
+		this.appAtemptId = appAttemptId;
+		this.yarnConfig = conf;
+		this.metaService = metaService;
+	}
+
+	public void startService() {
+		log.debug("starting yarn master service");
+		
+		executor = Executors.newSingleThreadScheduledExecutor();
+		executor.scheduleAtFixedRate(new YarnService(), 0, MASTERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+	}
+	
+	public void stopService() {
+		log.debug("stopping yarn master service");
+		
+		if(executor != null) {
+			executor.shutdown();
+			while(!executor.isTerminated()) {
+				try {
+					Thread.sleep(100);
+				} catch (InterruptedException e) {
+					// ignore
+				}
+			}
+			executor = null;
+		}
+	}
+	
+	Collection<ContainerMetadata> readOwnedMetadata() throws MetadataServiceException {
+		log.debug("reading container data");
+		
+		Collection<ContainerMetadata> containers = new ArrayList<ContainerMetadata>();
+		for(ContainerMetadata meta : metaService.readAll()) {
+			if(meta.owner.equals(appConfig.providerName)) {
+				containers.add(meta);
+				log.debug(String.format("found container node '%s' (state=%s, yarnId=%s, command=%s, owner=%s)", 
+						meta.id, meta.state, meta.yarnId, meta.command, meta.owner));
+			}
+		}
+		return containers;
+	}
+	
+	class YarnService implements Runnable {
+		int responseId = 0;
+		
+		@Override
+		public void run() {
+			try {
+				log.debug("running yarn service update cycle");
+				
+				Collection<ContainerMetadata> metadata = readOwnedMetadata();
+				
+				// active meta containers
+				int numMetaActive = countActiveMeta(metadata);
+				
+				// newly acquired meta containers
+				int numMetaAcquire = countAcquireMeta(metadata);
+				
+				// destroyed meta containers
+				List<ContainerId> destroyedReleasedIds = createDestroyedReleaseList(metadata);
+				int numMetaCompleted = destroyedReleasedIds.size();
+				
+				int numMeta = numMetaAcquire + numMetaActive + numMetaCompleted;
+				
+				// yarn containers
+				int numYarnUnassigned = unassignedContainers.size();
+				int numYarnActive = activeContainers.size();
+				int numYarnCompleted = completedContainers.size();
+				int numYarn = numYarnUnassigned + numYarnActive + numYarnCompleted;
+				
+				int numYarnRequired = numMetaAcquire - numYarnUnassigned;
+				
+				// additionally required containers
+				int numRequestAdditional = Math.max(0, numYarnRequired);
+				
+				// overstock containers
+				List<ContainerId> unneededReleasedIds = createOverstockReleaseList(numYarnRequired);
+				
+				int numReleased = destroyedReleasedIds.size() + unneededReleasedIds.size();
+				
+				log.debug(String.format("meta containers (total=%d, acquire=%d, active=%d, completed=%d)", numMeta, numMetaAcquire, numMetaActive, numMetaCompleted));
+				log.debug(String.format("yarn containers (total=%d, unassigned=%d, active=%d, completed=%d)", numYarn, numYarnUnassigned, numYarnActive, numYarnCompleted));
+				log.debug(String.format("requesting %d new containers, releasing %d", numRequestAdditional, numReleased));
+				
+				Priority priority = Records.newRecord(Priority.class);
+				priority.setPriority(0);
+				
+				Resource resource = Records.newRecord(Resource.class);
+				resource.setMemory(256); // TODO make dynamic
+				
+				ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+				resourceRequest.setHostName("*");
+				resourceRequest.setNumContainers(numRequestAdditional);
+				resourceRequest.setPriority(priority);
+				resourceRequest.setCapability(resource);
+				
+				AllocateRequest request = Records.newRecord(AllocateRequest.class);
+				request.setResponseId(responseId);
+				request.setApplicationAttemptId(appAtemptId);
+				request.addAsk(resourceRequest);
+				request.addAllReleases(destroyedReleasedIds);
+				request.addAllReleases(unneededReleasedIds);
+				
+				responseId++;
+				
+				AllocateResponse allocateResponse = null;
+				try {
+					allocateResponse = yarnClient.allocate(request);
+				} catch (YarnRemoteException e) {
+					// ignore
+					log.error("Error allocating containers", e);
+					return;
+				}
+				
+				AMResponse response = allocateResponse.getAMResponse();
+				
+				// newly added containers
+				for(Container container : response.getAllocatedContainers()) {
+					unassignedContainers.put(container.getId(), container);
+				}
+				
+				log.info(String.format("%d new containers available, %d required", unassignedContainers.size(), numMetaAcquire));
+				
+				Iterator<Container> itYarn = unassignedContainers.values().iterator();
+				Iterator<ContainerMetadata> itMeta = metadata.iterator();
+				while(itYarn.hasNext() && itMeta.hasNext()) {
+					ContainerMetadata meta = itMeta.next();
+					
+					if(meta.yarnId >= 0)
+						continue;
+					
+					Container containerYarn = itYarn.next();
+					
+					log.debug(String.format("assigning yarn container '%s' to container node '%s'", containerYarn.getId(), meta.id));
+					
+					String command = String.format(CONTAINER_COMMAND, meta.command,
+							appConfig.clusterAddress, appConfig.clusterName, appConfig.metadataAddress, appConfig.providerName,
+							meta.id, "/tmp/" + meta.id, "/tmp/" + meta.id);  
+							//ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+					
+					ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+					context.setContainerId(containerYarn.getId());
+					context.setResource(containerYarn.getResource());
+					context.setEnvironment(Maps.<String, String>newHashMap());
+					context.setCommands(Collections.singletonList(command));
+					context.setLocalResources(Utils.getDummyResources());
+					try {
+						context.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+					} catch (IOException e) {
+						log.error(String.format("failed setting up container '%s' user information", meta.id));
+						return;
+					}
+					
+					log.debug(String.format("container '%s' executing command '%s'", meta.id, command));
+
+					StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+					startReq.setContainerLaunchContext(context);
+					
+					try {
+						getContainerManager(containerYarn).startContainer(startReq);
+
+					} catch (YarnRemoteException e) {
+						log.error(String.format("Error starting container '%s'", meta.id), e);
+						return;
+					}
+						
+					log.debug(String.format("container '%s' started, updating container node", meta.id));
+
+					metaService.update(new ContainerMetadata(meta, ContainerState.CONNECTING, containerYarn.getId().getId()));
+					yarn2meta.put(containerYarn.getId(), meta.id);
+					
+					log.debug(String.format("removing '%s' from unassigned yarn containers and adding to active list", containerYarn.getId()));
+
+					itYarn.remove();
+					activeContainers.put(containerYarn.getId(), containerYarn);
+					
+				}
+				
+				for(ContainerStatus status : response.getCompletedContainersStatuses()) {
+					ContainerId id = status.getContainerId();
+					
+					log.info(String.format("Container '%s' completed", id));
+					
+					if(unassignedContainers.containsKey(id)) {
+						log.info(String.format("Unassigned container '%s' terminated, removing", id));
+						unassignedContainers.remove(id);
+						// TODO handle
+					}
+					
+					if(activeContainers.containsKey(id)) {
+						log.info(String.format("Active container '%s' terminated, removing", id));
+						activeContainers.remove(id);
+						
+						String metaId = yarn2meta.get(id);
+						ContainerMetadata meta = metaService.read(metaId);
+						
+						log.debug(String.format("container '%s' finalized, updating container node", meta.id));
+						
+						metaService.update(new ContainerMetadata(meta, ContainerState.FINALIZE));
+					}
+					
+					completedContainers.put(id, status);
+				}
+
+				log.debug("yarn service update cycle complete");
+				
+			} catch (Exception e) {
+				log.error("Error while executing yarn update cycle", e);
+			}
+		}
+
+		private List<ContainerId> createOverstockReleaseList(int numYarnRequired) {
+			List<ContainerId> unneededReleasedIds = new ArrayList<ContainerId>();
+			Iterator<Container> itUnassigned = unassignedContainers.values().iterator();
+			if(numYarnRequired < 0) {
+				for(int i=0; i<-numYarnRequired && itUnassigned.hasNext(); i++) {
+					Container container = itUnassigned.next();
+					unneededReleasedIds.add(container.getId());
+					log.debug(String.format("Container '%s' no longer required, removing", container.getId()));
+					itUnassigned.remove();
+				}
+			}
+			return unneededReleasedIds;
+		}
+
+		private List<ContainerId> createDestroyedReleaseList(
+				Collection<ContainerMetadata> metadata) {
+			List<ContainerId> releasedIds = new ArrayList<ContainerId>();
+			for(ContainerMetadata meta : metadata) {
+				if(meta.state == ContainerState.HALTED) {
+					ContainerId containerId = Records.newRecord(ContainerId.class);
+					containerId.setApplicationAttemptId(appAtemptId);
+					containerId.setId(meta.yarnId);
+					releasedIds.add(containerId);
+					log.debug(String.format("releasing container '%s'", containerId));
+				}
+			}
+			return releasedIds;
+		}
+
+		private int countAcquireMeta(Collection<ContainerMetadata> metadata) {
+			int numMetaAcquire = 0;
+			for(ContainerMetadata meta : metadata) {
+				if(meta.state == ContainerState.ACQUIRE) {
+					numMetaAcquire++;
+				}
+			}
+			return numMetaAcquire;
+		}
+
+		private int countActiveMeta(Collection<ContainerMetadata> metadata) {
+			int numMetaActive = 0;
+			for(ContainerMetadata meta : metadata) {
+				if(meta.state != ContainerState.ACQUIRE &&
+				   meta.state != ContainerState.HALTED &&
+				   meta.state != ContainerState.FINALIZE) {
+					numMetaActive++;
+				}
+			}
+			return numMetaActive;
+		}
+	}
+	
+	private ContainerManager getContainerManager(Container container) {
+		YarnConfiguration yarnConf = new YarnConfiguration(yarnConfig);
+		YarnRPC rpc = YarnRPC.create(yarnConf);
+		NodeId nodeId = container.getNodeId();
+		String containerIpPort = String.format("%s:%d", nodeId.getHost(),
+				nodeId.getPort());
+		log.info("Connecting to ContainerManager at: " + containerIpPort);
+		InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
+		ContainerManager cm = (ContainerManager) rpc.getProxy(
+				ContainerManager.class, addr, yarnConfig);
+		return cm;
+	}
+		  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java
new file mode 100644
index 0000000..b1a22d5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java
@@ -0,0 +1,171 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.managed.ManagedFactory;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnProcess {
+	static final Logger log = Logger.getLogger(YarnProcess.class);
+
+	static final long CONTAINERSERVICE_INTERVAL = 1000;
+
+	final ApplicationConfig appConfig;
+	final String containerId;
+	
+	HelixManager participantManager;
+
+	MetadataService metaService;
+	ScheduledExecutorService executor;
+
+
+	public YarnProcess(ApplicationConfig appConfig, String containerId) {
+		this.appConfig = appConfig;
+		this.containerId = containerId;
+	}
+
+	public void startService() {
+		log.info(String.format("start metadata service for '%s'", containerId));
+		metaService = new MetadataService(appConfig.metadataAddress);
+		metaService.start();
+		
+		executor = Executors.newSingleThreadScheduledExecutor();
+		executor.scheduleAtFixedRate(new ContainerService(), 0, CONTAINERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+	}
+
+	public void stopService() {
+		log.info(String.format("stop metadata service for '%s'", containerId));
+		if (metaService != null) {
+			metaService.stop();
+			metaService = null;
+		}
+		
+		if(executor != null) {
+			executor.shutdown();
+		}
+	}
+	
+	public boolean isRunning() {
+		if(executor == null)
+			return false;
+		return !executor.isTerminated();
+	}
+	
+	public void startParticipant() throws Exception {
+		log.info("STARTING " + containerId);
+		participantManager = HelixManagerFactory.getZKHelixManager(appConfig.clusterName,
+				containerId, InstanceType.PARTICIPANT, appConfig.clusterAddress);
+		participantManager.getStateMachineEngine().registerStateModelFactory(
+				"MasterSlave", new ManagedFactory());
+		participantManager.connect();
+		log.info("STARTED " + containerId);
+	}
+
+	public void stopParticipant() {
+		if (participantManager != null) {
+			participantManager.disconnect();
+			participantManager = null;
+		}
+	}
+	
+	public void updateContainerStatus() {
+		log.info("updating container status");
+		try {
+			ContainerMetadata meta = metaService.read(containerId);
+			
+			if(meta.state == ContainerState.CONNECTING) {
+				log.info("container connecting, going to active");
+				try {
+					startParticipant();
+					metaService.update(new ContainerMetadata(meta, ContainerState.ACTIVE));
+				} catch (Exception e) {
+					log.error("Failed to start participant, going to failed", e);
+					stopParticipant();
+					metaService.update(new ContainerMetadata(meta, ContainerState.FAILED));
+				}
+			}
+			
+			if(meta.state == ContainerState.ACTIVE) {
+				// do something
+				// and go to failed on error
+			}
+			
+			if(meta.state == ContainerState.TEARDOWN) {
+				log.info("container teardown, going to halted");
+				stopParticipant();
+				metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+				stopService();
+			}
+			
+		} catch(Exception e) {
+			log.warn(String.format("Container '%s' does not exist, stopping service", containerId));
+			stopService();
+		}
+	}
+	
+	class ContainerService implements Runnable {
+		@Override
+		public void run() {
+			updateContainerStatus();
+		}
+	}
+
+  public static void main(String[] args) throws Exception
+  {
+	log.trace("BEGIN YarnProcess.main()");
+	  
+    final String clusterAddress = args[0];
+    final String clusterName = args[1];
+    final String metadataAddress = args[2];
+    final String providerName = args[3];
+    final String containerId = args[4];
+
+    final ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, metadataAddress, providerName);
+    
+    final YarnProcess yarnProcess = new YarnProcess(appConfig, containerId);
+
+    yarnProcess.startService();
+    
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+		@Override
+		public void run() {
+			yarnProcess.stopService();
+		}
+	}));
+    
+	while(yarnProcess.isRunning()) {
+		try {
+			Thread.sleep(100);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+	}
+	
+	log.trace("END YarnProcess.main()");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java
new file mode 100644
index 0000000..00bf17f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java
@@ -0,0 +1,102 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.log4j.Logger;
+
+public class ZookeeperMetadataService implements MetadataService {
+	
+	static final Logger log = Logger.getLogger(ZookeeperMetadataService.class);
+	
+	static final String CONTAINER_NAMESPACE = "containers";
+	
+	static final String BASE_PATH = "/" + CONTAINER_NAMESPACE;
+	
+	static final long POLL_INTERVAL = 100;
+
+	final String metadataAddress;
+	
+	ZkClient client;
+	
+	public ZookeeperMetadataService(String metadataAddress) {
+		this.metadataAddress = metadataAddress;
+	}
+
+	public void startService() {
+		log.debug(String.format("starting metadata service for '%s'", metadataAddress));
+		
+		client = new ZkClient(metadataAddress);
+		
+		client.createPersistent(BASE_PATH, true);
+	}
+	
+	public void stopService() {
+		log.debug(String.format("stopping metadata service for '%s'", metadataAddress));
+		if(client != null) {
+			client.close();
+			client = null;
+		}
+	}
+	
+	@Override
+	public boolean exists(String id) {
+		return client.exists(makePath(id));
+	}
+	
+	@Override
+	public void create(ContainerMetadata meta) throws MetadataServiceException {
+		try {
+			client.createPersistent(makePath(meta.id), Utils.toJson(meta));
+		} catch (ZkException e) {
+			throw new MetadataServiceException(e);
+		}
+	}
+	
+	@Override
+	public ContainerMetadata read(String id) throws MetadataServiceException {
+		try {
+			return Utils.fromJson(client.<String>readData(makePath(id)));
+		} catch (ZkException e) {
+			throw new MetadataServiceException(e);
+		}
+	}
+	
+	@Override
+	public Collection<ContainerMetadata> readAll() throws MetadataServiceException {
+		try {
+			Collection<ContainerMetadata> metadata = new ArrayList<ContainerMetadata>();
+			for(String id : client.getChildren(BASE_PATH)) {
+				metadata.add(Utils.fromJson(client.<String>readData(makePath(id))));
+			}
+			return metadata;
+		} catch (ZkException e) {
+			throw new MetadataServiceException(e);
+		}
+	}
+	
+	@Override
+	public void update(ContainerMetadata meta) throws MetadataServiceException {
+		try {
+			client.writeData(makePath(meta.id), Utils.toJson(meta));
+		} catch (ZkException e) {
+			throw new MetadataServiceException(e);
+		}
+	}
+	
+	@Override
+	public void delete(String id) throws MetadataServiceException {
+		try {
+			client.delete(makePath(id));
+		} catch (ZkException e) {
+			throw new MetadataServiceException(e);
+		}
+	}
+	
+	String makePath(String containerId) {
+		return BASE_PATH + "/" + containerId;
+	}
+	
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java
new file mode 100644
index 0000000..5950d42
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java
@@ -0,0 +1,32 @@
+package org.apache.helix.metamanager.yarn;
+
+public class ApplicationConfig {
+	final String clusterAddress;
+	final String clusterName;
+	final String providerAddress;
+	final String providerName;
+
+	public ApplicationConfig(String clusterAddress, String clusterName,
+			String providerAddress, String providerName) {
+		this.clusterAddress = clusterAddress;
+		this.clusterName = clusterName;
+		this.providerAddress = providerAddress;
+		this.providerName = providerName;
+	}
+
+	public String getClusterAddress() {
+		return clusterAddress;
+	}
+
+	public String getClusterName() {
+		return clusterName;
+	}
+
+	public String getProviderAddress() {
+		return providerAddress;
+	}
+
+	public String getProviderName() {
+		return providerName;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java
new file mode 100644
index 0000000..1245080
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java
@@ -0,0 +1,50 @@
+package org.apache.helix.metamanager.yarn;
+
+
+class ContainerMetadata {
+
+	static enum ContainerState {
+		ACQUIRE,
+		CONNECTING,
+		ACTIVE,
+		TEARDOWN,
+		FAILED,
+		HALTED,
+		FINALIZE
+	}
+	
+	String id;
+	ContainerState state;
+	int yarnId;
+	String command;
+	String owner;
+
+	public ContainerMetadata() {
+		// left blank
+	}
+	
+	public ContainerMetadata(String id, String command, String owner) {
+		this.id = id;
+		this.state = ContainerState.ACQUIRE;
+		this.yarnId = -1;
+		this.command = command;
+		this.owner = owner;
+	}
+	
+	public ContainerMetadata(ContainerMetadata node, ContainerState state) {
+		this.id = node.id;
+		this.state = state;
+		this.yarnId = node.yarnId;
+		this.command = node.command;
+		this.owner = node.owner;
+	}
+	
+	public ContainerMetadata(ContainerMetadata node, ContainerState state, int yarnId) {
+		this.id = node.id;
+		this.state = state;
+		this.yarnId = yarnId;
+		this.command = node.command;
+		this.owner = node.owner;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java
new file mode 100644
index 0000000..59b9325
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java
@@ -0,0 +1,61 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+
+class ContainerNode implements Serializable {
+	/**
+	 * 
+	 */
+	private static final long serialVersionUID = 2578978959080378923L;
+
+	static enum ContainerState {
+		ACQUIRE,
+		CONNECT,
+		READY,
+		STARTING,
+		RUNNING,
+		STOPPING,
+		TEARDOWN,
+		FINALIZE
+	}
+	
+	final String id;
+	final ContainerState state;
+	final ContainerId yarnId;
+	
+	final String zkAddress;
+	final String clusterName;
+	final String command;
+
+	public ContainerNode(String id, String zkAddress, String clusterName, String command) {
+		this.id = id;
+		this.state = ContainerState.ACQUIRE;
+		this.yarnId = null;
+		this.zkAddress = zkAddress;
+		this.clusterName = clusterName;
+		this.command = command;
+	}
+	
+	public ContainerNode(ContainerNode node, ContainerState state) {
+		this.id = node.id;
+		this.state = state;
+		this.yarnId = node.yarnId;
+		this.zkAddress = node.zkAddress;
+		this.clusterName = node.clusterName;
+		this.command = node.command;
+	}
+	
+	public ContainerNode(ContainerNode node, ContainerState state, ContainerId yarnId) {
+		this.id = node.id;
+		this.state = state;
+		this.yarnId = yarnId;
+		this.zkAddress = node.zkAddress;
+		this.clusterName = node.clusterName;
+		this.command = node.command;
+	}
+	
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java
new file mode 100644
index 0000000..ba5be81
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java
@@ -0,0 +1,20 @@
+package org.apache.helix.metamanager.yarn;
+
+
+class MessageNode {
+	static enum MessageType {
+		CREATE,
+		START,
+		STOP,
+		DESTROY
+	}
+	
+	final String id;
+	final MessageType type;
+
+	public MessageNode(String id, MessageType type) {
+		this.id = id;
+		this.type = type;
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java
new file mode 100644
index 0000000..be88826
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java
@@ -0,0 +1,146 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeoutException;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class MetadataService {
+	
+	static final Logger log = Logger.getLogger(MetadataService.class);
+	
+	static final String CONTAINER_NAMESPACE = "containers";
+	
+//	static final String LOCK_PATH = "/" + CONTAINER_NAMESPACE + "/lock";
+	static final long POLL_INTERVAL = 100;
+
+	final ApplicationConfig appConfig;
+	
+	ZkClient client;
+	String basePath;
+	
+	public MetadataService(ApplicationConfig appConfig) {
+		this.appConfig = appConfig;
+	}
+
+	public void start() {
+		basePath = "/" + CONTAINER_NAMESPACE;
+		log.debug(String.format("starting metadata service for '%s/%s'", appConfig.providerAddress, appConfig.providerName));
+		
+		client = new ZkClient(appConfig.providerAddress);
+		
+		client.createPersistent(basePath, true);
+	}
+	
+	public void stop() {
+		log.debug(String.format("stopping metadata service for '%s/%s'", appConfig.providerAddress, appConfig.providerName));
+		if(client != null) {
+			client.close();
+			client = null;
+		}
+	}
+	
+//	public void lock(long timeout) throws Exception {
+//		long limit = System.currentTimeMillis() + timeout;
+//		while (limit > System.currentTimeMillis()) {
+//			try {
+//				client.createEphemeral(LOCK_PATH);
+//				return;
+//			} catch (Exception ignore) {}
+//			Thread.sleep(POLL_INTERVAL);
+//		}
+//		throw new IllegalStateException("Could not acquire lock");
+//	}
+//	
+//	public void unlock() {
+//		client.delete(LOCK_PATH);
+//	}
+	
+	public void create(ContainerMetadata meta) throws IllegalMetadataStateException {
+		try {
+			client.createPersistent(makePath(meta.id), Utils.toJson(meta));
+		} catch (ZkException e) {
+			throw new IllegalMetadataStateException(e);
+		}
+	}
+	
+	public ContainerMetadata read(String id) throws IllegalMetadataStateException {
+		try {
+			return Utils.fromJson(client.<String>readData(makePath(id)));
+		} catch (ZkException e) {
+			throw new IllegalMetadataStateException(e);
+		}
+	}
+	
+	public Collection<ContainerMetadata> readAll() throws IllegalMetadataStateException {
+		try {
+			Collection<ContainerMetadata> metadata = new ArrayList<ContainerMetadata>();
+			for(String id : client.getChildren(basePath)) {
+				metadata.add(Utils.fromJson(client.<String>readData(makePath(id))));
+			}
+			return metadata;
+		} catch (ZkException e) {
+			throw new IllegalMetadataStateException(e);
+		}
+	}
+	
+	public void update(ContainerMetadata meta) throws IllegalMetadataStateException {
+		try {
+			client.writeData(makePath(meta.id), Utils.toJson(meta));
+		} catch (ZkException e) {
+			throw new IllegalMetadataStateException(e);
+		}
+	}
+	
+	public void delete(String id) throws IllegalMetadataStateException {
+		try {
+			client.delete(makePath(id));
+		} catch (ZkException e) {
+			throw new IllegalMetadataStateException(e);
+		}
+	}
+	
+	public void waitForState(String id, ContainerState state, long timeout) throws IllegalMetadataStateException, InterruptedException, TimeoutException {
+		long limit = System.currentTimeMillis() + timeout;
+		ContainerMetadata meta = read(id);
+		while(meta.state != state) {
+			if(System.currentTimeMillis() >= limit) {
+				throw new TimeoutException(String.format("Container '%s' failed to reach state '%s' (currently is '%s')", id, state, meta.state));
+			}
+			Thread.sleep(POLL_INTERVAL);
+			meta = read(id);
+		}
+	}
+	
+	String makePath(String containerId) {
+		return basePath + "/" + containerId;
+	}
+	
+	public static class IllegalMetadataStateException extends Exception {
+
+		/**
+		 * 
+		 */
+		private static final long serialVersionUID = -2846997013918977056L;
+
+		public IllegalMetadataStateException() {
+			super();
+		}
+
+		public IllegalMetadataStateException(String message, Throwable cause) {
+			super(message, cause);
+		}
+
+		public IllegalMetadataStateException(String message) {
+			super(message);
+		}
+
+		public IllegalMetadataStateException(Throwable cause) {
+			super(cause);
+		}	
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java
new file mode 100644
index 0000000..49f70d3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java
@@ -0,0 +1,93 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+public class Utils {
+	
+	static final Logger log = Logger.getLogger(Utils.class);
+	
+	static Gson gson;
+	static {
+		GsonBuilder builder = new GsonBuilder();
+		builder.registerTypeAdapter(ContainerState.class, new ContainerStateAdapter());
+		builder.setPrettyPrinting();
+		gson = builder.create();
+	}
+	static Map<String, LocalResource>  dummyResources = createDummyResources();
+	
+	static String toJson(ContainerMetadata meta) {
+		return gson.toJson(meta);
+	}
+	
+	static ContainerMetadata fromJson(String json) {
+		return gson.fromJson(json, ContainerMetadata.class);
+	}
+	
+	static Map<String, LocalResource> getDummyResources() {
+		return dummyResources;
+	}
+
+	private static Map<String, LocalResource> createDummyResources() {
+		File dummy = new File("/tmp/dummy");
+		
+		if(!dummy.exists()) {
+	    	try {
+	    		dummy.createNewFile();
+	    	} catch(Exception e) {
+	    		log.error("could not create dummy file", e);
+	    		System.exit(1);
+	    	}
+		}
+	    
+	    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+	    Path path = new Path(dummy.toURI());
+	    LocalResource localResource = Records.newRecord(LocalResource.class);
+	    localResource.setType(LocalResourceType.FILE);
+	    localResource.setVisibility(LocalResourceVisibility.APPLICATION);          
+	    localResource.setResource(ConverterUtils.getYarnUrlFromPath(path)); 
+	    localResource.setTimestamp(dummy.lastModified());
+	    localResource.setSize(dummy.length());
+	    localResources.put("dummy", localResource);
+		return localResources;
+	}
+	
+	static class ContainerStateAdapter extends TypeAdapter<ContainerState> {
+		@Override
+		public ContainerState read(JsonReader reader) throws IOException {
+			if (reader.peek() == JsonToken.NULL) {
+				reader.nextNull();
+				return null;
+			}
+			return ContainerState.valueOf(reader.nextString());
+		}
+
+		@Override
+		public void write(JsonWriter writer, ContainerState value) throws IOException {
+			if (value == null) {
+				writer.nullValue();
+				return;
+			}
+			writer.value(value.name());
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java
new file mode 100644
index 0000000..7d2099a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java
@@ -0,0 +1,126 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+public class YarnApplication {
+
+	static final Logger log = Logger.getLogger(YarnApplication.class);
+	
+	static final String ENV_CLUSTER_ADDRESS = "CLUSTER_ADDRESS";
+	static final String ENV_CLUSTER_NAME = "CLUSTER_NAME";
+	static final String ENV_PROVIDER_ADDRESS = "PROVIDER_ADDRESS";
+	static final String ENV_PROVIDER_NAME = "PROVIDER_NAME";
+
+	static final String MASTER_COMMAND = "metamanager.master.command";
+	static final String DEFAULT_MASTER_COMMAND = "/bin/sh /home/apucher/incubator-helix/recipes/meta-cluster-manager/target/meta-cluster-manager-pkg/bin/yarn-master-process.sh 1>%s/stdout 2>%s/stderr";
+
+	Configuration conf;
+	YarnRPC rpc;
+	ClientRMProtocol rmClient;
+	ApplicationId appId;
+	
+	final ApplicationConfig appConfig;
+
+	public YarnApplication(ApplicationConfig appConfig) {
+		this.appConfig = appConfig;
+		configure(new YarnConfiguration());
+	}
+
+	public void start() throws Exception {
+		connect();
+		
+		String command = String.format(conf.get(MASTER_COMMAND, DEFAULT_MASTER_COMMAND), "/tmp/" + appConfig.providerName, "/tmp/" + appConfig.providerName); 
+				//ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+		log.info(String.format("Starting application '%s/%s' (masterCommand='%s')", appConfig.providerAddress, appConfig.providerName, command));
+
+		// app id
+		GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class);
+		GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest);
+
+		this.appId = appResponse.getApplicationId();
+
+		log.info(String.format("Acquired app id '%s' for '%s/%s'", appId.toString(), appConfig.providerAddress, appConfig.providerName));
+		
+		// command
+		ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class);
+		launchContext.setCommands(Collections.singletonList(command));
+
+		// resource limit
+		Resource resource = Records.newRecord(Resource.class);
+		resource.setMemory(256); // TODO make dynamic
+		launchContext.setResource(resource);
+		
+	    // environment
+	    Map<String, String> env = new HashMap<String, String>();
+	    env.put(ENV_CLUSTER_ADDRESS, appConfig.clusterAddress);
+	    env.put(ENV_CLUSTER_NAME, appConfig.clusterName);
+	    env.put(ENV_PROVIDER_ADDRESS, appConfig.providerAddress);
+	    env.put(ENV_PROVIDER_NAME, appConfig.providerName);
+	    launchContext.setEnvironment(env);
+	    
+	    // local resources
+	    // YARN workaround: create dummy resource 
+	    Map<String, LocalResource> localResources = Utils.getDummyResources();
+	    launchContext.setLocalResources(localResources);
+	    
+	    // app submission
+	    ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class);
+		subContext.setApplicationId(appId);
+		subContext.setApplicationName(appConfig.providerName);
+		subContext.setAMContainerSpec(launchContext);
+
+		SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class);
+		subRequest.setApplicationSubmissionContext(subContext);
+		
+		log.info(String.format("Starting app id '%s'", appId.toString()));
+
+		rmClient.submitApplication(subRequest);
+		
+	}
+
+	public void stop() throws YarnRemoteException {
+		log.info(String.format("Stopping app id '%s'", appId.toString()));
+		KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
+		killRequest.setApplicationId(appId);
+
+		rmClient.forceKillApplication(killRequest);
+	}
+
+	void configure(Configuration conf) {
+		this.conf = Preconditions.checkNotNull(conf);
+		this.rpc = YarnRPC.create(conf);
+	}
+
+	void connect() {
+		YarnConfiguration yarnConf = new YarnConfiguration(conf);
+		InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+				YarnConfiguration.RM_ADDRESS,
+				YarnConfiguration.DEFAULT_RM_ADDRESS));
+		log.info("Connecting to ResourceManager at: " + rmAddress);
+		this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java
new file mode 100644
index 0000000..3447661
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java
@@ -0,0 +1,5 @@
+package org.apache.helix.metamanager.yarn;
+
+public class YarnClient {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java
new file mode 100644
index 0000000..d36eee9
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java
@@ -0,0 +1,14 @@
+package org.apache.helix.metamanager.yarn;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+public class YarnContainer extends Configured implements Tool {
+
+	@Override
+	public int run(String[] args) throws Exception {
+		// TODO Auto-generated method stub
+		return 0;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java
new file mode 100644
index 0000000..34a6b61
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java
@@ -0,0 +1,90 @@
+package org.apache.helix.metamanager.yarn;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnContainerProvider implements ClusterContainerProvider {
+	
+	static final Logger log = Logger.getLogger(YarnContainerProvider.class);
+
+	static final String REQUIRED_TYPE = "container";
+	
+	static final long LOCK_TIMEOUT = 1000;
+	static final long CONTAINER_TIMEOUT = 10000;
+	
+	/*
+	 * CONTAINERS
+	 *   A (A, READY)
+	 *   B (B, RUNNING)
+	 */
+	
+	final ApplicationConfig appConfig;
+	final String command;
+	
+	final Object notifier = new Object();
+	
+	MetadataService metaService;
+	
+	public YarnContainerProvider(ApplicationConfig appConfig, String command) {
+		this.appConfig = appConfig;
+		this.command = command;
+	}
+
+	@Override
+	public void create(final String id, final String type) throws Exception {
+		if(!REQUIRED_TYPE.equals(type)) {
+			throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+		}
+		
+		metaService.create(new ContainerMetadata(id, command, appConfig.providerName));
+		metaService.waitForState(id, ContainerState.ACTIVE, CONTAINER_TIMEOUT);
+	}
+
+	@Override
+	public void destroy(final String id) throws Exception {
+		ContainerMetadata meta = metaService.read(id);
+
+		if(meta.state == ContainerState.ACTIVE) {
+			log.info(String.format("Destroying active container, going to teardown"));
+			metaService.update(new ContainerMetadata(meta, ContainerState.TEARDOWN));
+			
+		} else if(meta.state == ContainerState.FAILED) {
+			log.info(String.format("Destroying failed container, going to halted"));
+			metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+			
+		} else if(meta.state == ContainerState.FINALIZE) {
+			log.info(String.format("Destroying finalized container, skipping"));
+			
+		} else {
+			throw new IllegalStateException(String.format("Container '%s' must be active, failed or finalized", id));
+		}
+		
+		metaService.waitForState(id, ContainerState.FINALIZE, CONTAINER_TIMEOUT);
+		metaService.delete(id);
+	}
+
+	@Override
+	public void destroyAll() {
+		try {
+			for(ContainerMetadata meta : metaService.readAll()) {
+				try { destroy(meta.id); } catch (Exception ignore) {}
+			}
+		} catch (Exception ignore) {
+			// ignore
+		}
+	}
+
+	public void startService() {
+		metaService = new MetadataService(appConfig);
+		metaService.start();
+	}
+	
+	public void stopService() {
+		if(metaService != null) {
+			metaService.stop();
+			metaService = null;
+		}
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java
new file mode 100644
index 0000000..855dddd
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java
@@ -0,0 +1,370 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.yarn.MetadataService.IllegalMetadataStateException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+public class YarnContainerService {
+
+	static final Logger log = Logger.getLogger(YarnContainerService.class);
+
+	static final String REQUIRED_TYPE = "container";
+	
+	static final long ZOOKEEPER_TIMEOUT = 5000;
+	
+	static final long YARNSERVICE_INTERVAL = 1000;
+	
+	static final String CONTAINERS = "CONTAINERS";
+	
+	static final String CONTAINER_COMMAND = "/bin/sh %s %s %s %s %s %s 1>%s/stdout 2>%s/stderr";
+
+	/*
+	 * CONTAINERS
+	 *   A (A, READY)
+	 *   B (B, RUNNING)
+	 */
+	
+	final ApplicationConfig appConfig;
+	final AMRMProtocol yarnClient;
+	final ApplicationAttemptId appAtemptId;
+	
+	final Configuration yarnConfig;
+	
+	final File dummy = new File("/tmp/dummy");
+	
+	final Map<ContainerId, Container> unassignedContainers = new HashMap<ContainerId, Container>();
+	final Map<ContainerId, Container> activeContainers = new HashMap<ContainerId, Container>();
+	final Map<ContainerId, ContainerStatus> completedContainers = new HashMap<ContainerId, ContainerStatus>();
+	final Map<ContainerId, String> yarn2meta = new HashMap<ContainerId, String>();
+	
+	int numRequestedLast = 0;
+
+	MetadataService metaService;
+	
+	ScheduledExecutorService executor;
+
+	public YarnContainerService(AMRMProtocol yarnClient, Configuration conf, ApplicationAttemptId appAttemptId, ApplicationConfig appConfig) {
+		this.appConfig = appConfig;
+		this.yarnClient = yarnClient;
+		this.appAtemptId = appAttemptId;
+		this.yarnConfig = conf;
+	}
+
+	public void startService() {
+		log.debug("starting container service");
+		
+		metaService = new MetadataService(appConfig);
+		metaService.start();
+		
+		executor = Executors.newSingleThreadScheduledExecutor();
+		executor.scheduleAtFixedRate(new YarnService(), 0, YARNSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+	}
+	
+	public void stopService() {
+		log.debug("stopping container service");
+		
+		if(executor != null) {
+			executor.shutdown();
+			while(!executor.isTerminated()) {
+				try {
+					Thread.sleep(100);
+				} catch (InterruptedException e) {
+					// ignore
+				}
+			}
+			executor = null;
+		}
+		
+		if(metaService != null) {
+			metaService.stop();
+			metaService = null;
+		}
+	}
+	
+	Collection<ContainerMetadata> readOwnedMetadata() throws IllegalMetadataStateException {
+		log.debug("reading container data");
+		
+		Collection<ContainerMetadata> containers = new ArrayList<ContainerMetadata>();
+		for(ContainerMetadata meta : metaService.readAll()) {
+			if(meta.owner.equals(appConfig.providerName)) {
+				containers.add(meta);
+				log.debug(String.format("found container node '%s' (state=%s, yarnId=%s, command=%s, owner=%s)", 
+						meta.id, meta.state, meta.yarnId, meta.command, meta.owner));
+			}
+		}
+		return containers;
+	}
+	
+	class YarnService implements Runnable {
+		int responseId = 0;
+		
+		@Override
+		public void run() {
+			try {
+				log.debug("running yarn service update cycle");
+				
+				Collection<ContainerMetadata> metadata = readOwnedMetadata();
+				
+				// active meta containers
+				int numMetaActive = countActiveMeta(metadata);
+				
+				// newly acquired meta containers
+				int numMetaAcquire = countAcquireMeta(metadata);
+				
+				// destroyed meta containers
+				List<ContainerId> destroyedReleasedIds = createDestroyedReleaseList(metadata);
+				int numMetaCompleted = destroyedReleasedIds.size();
+				
+				int numMeta = numMetaAcquire + numMetaActive + numMetaCompleted;
+				
+				// yarn containers
+				int numYarnUnassigned = unassignedContainers.size();
+				int numYarnActive = activeContainers.size();
+				int numYarnCompleted = completedContainers.size();
+				int numYarn = numYarnUnassigned + numYarnActive + numYarnCompleted;
+				
+				int numYarnRequired = numMetaAcquire - numYarnUnassigned;
+				
+				// additionally required containers
+				int numRequestAdditional = Math.max(0, numYarnRequired);
+				
+				// overstock containers
+				List<ContainerId> unneededReleasedIds = createOverstockReleaseList(numYarnRequired);
+				
+				log.debug(String.format("meta containers (total=%d, acquire=%d, active=%d, completed=%d)", numMeta, numMetaAcquire, numMetaActive, numMetaCompleted));
+				log.debug(String.format("yarn containers (total=%d, unassigned=%d, active=%d, completed=%d)", numYarn, numYarnUnassigned, numYarnActive, numYarnCompleted));
+				log.debug(String.format("requesting %d new containers (%d requested last), releasing %d", numRequestAdditional, numRequestedLast, destroyedReleasedIds.size()));
+				
+				Priority priority = Records.newRecord(Priority.class);
+				priority.setPriority(0);
+				
+				Resource resource = Records.newRecord(Resource.class);
+				resource.setMemory(256); // TODO make dynamic
+				
+				ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+				resourceRequest.setHostName("*");
+				resourceRequest.setNumContainers(numRequestAdditional);
+				resourceRequest.setPriority(priority);
+				resourceRequest.setCapability(resource);
+				
+				AllocateRequest request = Records.newRecord(AllocateRequest.class);
+				request.setResponseId(responseId);
+				request.setApplicationAttemptId(appAtemptId);
+				request.addAsk(resourceRequest);
+				request.addAllReleases(destroyedReleasedIds);
+				request.addAllReleases(unneededReleasedIds);
+				
+				responseId++;
+				
+				AllocateResponse allocateResponse = null;
+				try {
+					allocateResponse = yarnClient.allocate(request);
+				} catch (YarnRemoteException e) {
+					// ignore
+					log.error("Error allocating containers", e);
+					return;
+				}
+				
+				numRequestedLast = numRequestAdditional;
+				
+				AMResponse response = allocateResponse.getAMResponse();
+				
+				// newly added containers
+				for(Container container : response.getAllocatedContainers()) {
+					unassignedContainers.put(container.getId(), container);
+				}
+				
+				log.info(String.format("%d new containers available, %d required", unassignedContainers.size(), numMetaAcquire));
+				
+				Iterator<Container> itYarn = unassignedContainers.values().iterator();
+				Iterator<ContainerMetadata> itMeta = metadata.iterator();
+				while(itYarn.hasNext() && itMeta.hasNext()) {
+					ContainerMetadata meta = itMeta.next();
+					
+					if(meta.yarnId >= 0)
+						continue;
+					
+					Container containerYarn = itYarn.next();
+					
+					log.debug(String.format("assigning yarn container '%s' to container node '%s'", containerYarn.getId(), meta.id));
+					
+					String command = String.format(CONTAINER_COMMAND, meta.command,
+							appConfig.clusterAddress, appConfig.clusterName, appConfig.providerAddress, appConfig.providerName,
+							meta.id, "/tmp/" + meta.id, "/tmp/" + meta.id);  
+							//ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+					
+					ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+					context.setContainerId(containerYarn.getId());
+					context.setResource(containerYarn.getResource());
+					context.setEnvironment(Maps.<String, String>newHashMap());
+					context.setCommands(Collections.singletonList(command));
+					context.setLocalResources(Utils.getDummyResources());
+					try {
+						context.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+					} catch (IOException e) {
+						log.error(String.format("failed setting up container '%s' user information", meta.id));
+						return;
+					}
+					
+					log.debug(String.format("container '%s' executing command '%s'", meta.id, command));
+
+					StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+					startReq.setContainerLaunchContext(context);
+					
+					try {
+						getContainerManager(containerYarn).startContainer(startReq);
+
+					} catch (YarnRemoteException e) {
+						log.error(String.format("Error starting container '%s'", meta.id), e);
+						return;
+					}
+						
+					log.debug(String.format("container '%s' started, updating container node", meta.id));
+
+					metaService.update(new ContainerMetadata(meta, ContainerState.CONNECTING, containerYarn.getId().getId()));
+					yarn2meta.put(containerYarn.getId(), meta.id);
+					
+					log.debug(String.format("removing '%s' from unassigned yarn containers and adding to active list", containerYarn.getId()));
+
+					itYarn.remove();
+					activeContainers.put(containerYarn.getId(), containerYarn);
+					
+				}
+				
+				for(ContainerStatus status : response.getCompletedContainersStatuses()) {
+					ContainerId id = status.getContainerId();
+					
+					log.info(String.format("Container '%s' completed", id));
+					
+					if(unassignedContainers.containsKey(id)) {
+						log.info(String.format("Unassigned container '%s' terminated, removing", id));
+						unassignedContainers.remove(id);
+						// TODO handle
+					}
+					
+					if(activeContainers.containsKey(id)) {
+						log.info(String.format("Active container '%s' terminated, removing", id));
+						activeContainers.remove(id);
+						
+						String metaId = yarn2meta.get(id);
+						ContainerMetadata meta = metaService.read(metaId);
+						
+						log.debug(String.format("container '%s' finalized, updating container node", meta.id));
+						
+						metaService.update(new ContainerMetadata(meta, ContainerState.FINALIZE));
+					}
+					
+					completedContainers.put(id, status);
+				}
+
+				log.debug("yarn service update cycle complete");
+				
+			} catch (Exception e) {
+				log.error("Error while executing yarn update cycle", e);
+			}
+		}
+
+		private List<ContainerId> createOverstockReleaseList(int numYarnRequired) {
+			List<ContainerId> unneededReleasedIds = new ArrayList<ContainerId>();
+			Iterator<Container> itUnassigned = unassignedContainers.values().iterator();
+			if(numYarnRequired < 0) {
+				for(int i=0; i<-numYarnRequired && itUnassigned.hasNext(); i++) {
+					Container container = itUnassigned.next();
+					unneededReleasedIds.add(container.getId());
+					log.debug(String.format("Container '%s' no longer required, removing", container.getId()));
+					itUnassigned.remove();
+				}
+			}
+			return unneededReleasedIds;
+		}
+
+		private List<ContainerId> createDestroyedReleaseList(
+				Collection<ContainerMetadata> metadata) {
+			List<ContainerId> releasedIds = new ArrayList<ContainerId>();
+			for(ContainerMetadata meta : metadata) {
+				if(meta.state == ContainerState.HALTED) {
+					ContainerId containerId = Records.newRecord(ContainerId.class);
+					containerId.setApplicationAttemptId(appAtemptId);
+					containerId.setId(meta.yarnId);
+					releasedIds.add(containerId);
+					log.debug(String.format("releasing container '%s'", containerId));
+				}
+			}
+			return releasedIds;
+		}
+
+		private int countAcquireMeta(Collection<ContainerMetadata> metadata) {
+			int numMetaAcquire = 0;
+			for(ContainerMetadata meta : metadata) {
+				if(meta.state == ContainerState.ACQUIRE) {
+					numMetaAcquire++;
+				}
+			}
+			return numMetaAcquire;
+		}
+
+		private int countActiveMeta(Collection<ContainerMetadata> metadata) {
+			int numMetaActive = 0;
+			for(ContainerMetadata meta : metadata) {
+				if(meta.state != ContainerState.ACQUIRE &&
+				   meta.state != ContainerState.HALTED &&
+				   meta.state != ContainerState.FINALIZE) {
+					numMetaActive++;
+				}
+			}
+			return numMetaActive;
+		}
+	}
+	
+	private ContainerManager getContainerManager(Container container) {
+		YarnConfiguration yarnConf = new YarnConfiguration(yarnConfig);
+		YarnRPC rpc = YarnRPC.create(yarnConf);
+		NodeId nodeId = container.getNodeId();
+		String containerIpPort = String.format("%s:%d", nodeId.getHost(),
+				nodeId.getPort());
+		log.info("Connecting to ContainerManager at: " + containerIpPort);
+		InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
+		ContainerManager cm = (ContainerManager) rpc.getProxy(
+				ContainerManager.class, addr, yarnConfig);
+		return cm;
+	}
+		  
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java
new file mode 100644
index 0000000..4314bdc
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java
@@ -0,0 +1,5 @@
+package org.apache.helix.metamanager.yarn;
+
+public class YarnHelper {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java
new file mode 100644
index 0000000..a2aef0e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java
@@ -0,0 +1,134 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+public class YarnMaster extends Configured implements Tool {
+
+	static final Logger log = Logger.getLogger(YarnMaster.class);
+	
+	AMRMProtocol resourceManager;
+	ApplicationAttemptId appAttemptId;
+	
+	YarnContainerService service;
+	
+	@Override
+	public int run(String[] args) throws Exception {
+		log.trace("BEGIN YarnMaster.run()");
+			
+		Configuration conf = getConf();
+		
+		this.appAttemptId = getApplicationAttemptId();
+		log.info(String.format("Got application attempt id '%s'", appAttemptId.toString()));
+		
+		log.debug("Getting resource manager");
+		this.resourceManager = getResourceManager(conf);
+
+	    // register the AM with the RM
+		log.debug("Registering application master");
+	    RegisterApplicationMasterRequest appMasterRequest = 
+	        Records.newRecord(RegisterApplicationMasterRequest.class);
+	    appMasterRequest.setApplicationAttemptId(appAttemptId);     
+	    appMasterRequest.setHost("");
+	    appMasterRequest.setRpcPort(0);
+	    appMasterRequest.setTrackingUrl("");
+
+	    resourceManager.registerApplicationMaster(appMasterRequest);
+
+	    String clusterAddress = getEnv(YarnApplication.ENV_CLUSTER_ADDRESS);
+	    String clusterName = getEnv(YarnApplication.ENV_CLUSTER_NAME);
+	    String providerAddress = getEnv(YarnApplication.ENV_PROVIDER_ADDRESS);
+	    String providerName = getEnv(YarnApplication.ENV_PROVIDER_NAME);
+	    ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, providerAddress, providerName);
+	    
+	    service = new YarnContainerService(resourceManager, conf, appAttemptId, appConfig);
+	    service.startService();
+	    
+	    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+	    	@Override
+	    	public void run() {
+
+	    		service.stopService();
+	    		
+	    		// finish application
+	    	    log.debug("Sending finish request");
+	    	    FinishApplicationMasterRequest finishReq = 
+	    	    	Records.newRecord(FinishApplicationMasterRequest.class);
+	    	    
+	    	    finishReq.setAppAttemptId(getApplicationAttemptId());
+	    	    finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+	    	    
+	    	    try { resourceManager.finishApplicationMaster(finishReq); } catch(Exception ignore) {}
+	    	}
+	    }));
+	    
+	    try { Thread.currentThread().join(); } catch(Exception ignore) {}
+	    
+		log.trace("END YarnMaster.run()");
+		
+		return 0;
+	}
+
+	private AMRMProtocol getResourceManager(Configuration conf) {
+		// Connect to the Scheduler of the ResourceManager.
+	    YarnConfiguration yarnConf = new YarnConfiguration(conf);
+	    YarnRPC rpc = YarnRPC.create(yarnConf);
+	    InetSocketAddress rmAddress = 
+	        NetUtils.createSocketAddr(yarnConf.get(
+	            YarnConfiguration.RM_SCHEDULER_ADDRESS,
+	            YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));           
+	    log.info("Connecting to ResourceManager at " + rmAddress);
+	    AMRMProtocol resourceManager = 
+	        (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
+		return resourceManager;
+	}
+
+	private ApplicationAttemptId getApplicationAttemptId() {
+	    ContainerId containerId = ConverterUtils.toContainerId(getEnv(ApplicationConstants.AM_CONTAINER_ID_ENV));
+	    ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+		return appAttemptID;
+	}
+	
+	private String getEnv(String key) {
+		Map<String, String> envs = System.getenv();
+	    String clusterName = envs.get(key);
+	    if (clusterName == null) {
+	      // container id should always be set in the env by the framework 
+	      throw new IllegalArgumentException(
+	          String.format("%s not set in the environment", key));
+	    }
+	    return clusterName;
+	}
+
+	public static void main(String[] args) throws Exception {
+		log.trace("BEGIN YarnMaster.main()");
+
+		try {
+			int rc = ToolRunner.run(new Configuration(), new YarnMaster(), args);
+			System.exit(rc);
+		} catch (Exception e) {
+			System.err.println(e);
+			System.exit(1);
+		}
+
+		log.trace("END YarnMaster.main()");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java
new file mode 100644
index 0000000..7108d39
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java
@@ -0,0 +1,171 @@
+package org.apache.helix.metamanager.yarn;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.managed.ManagedFactory;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnProcess {
+	static final Logger log = Logger.getLogger(YarnProcess.class);
+
+	static final long CONTAINERSERVICE_INTERVAL = 1000;
+
+	final ApplicationConfig appConfig;
+	final String containerId;
+	
+	HelixManager participantManager;
+
+	MetadataService metaService;
+	ScheduledExecutorService executor;
+
+
+	public YarnProcess(ApplicationConfig appConfig, String containerId) {
+		this.appConfig = appConfig;
+		this.containerId = containerId;
+	}
+
+	public void startService() {
+		log.info(String.format("start metadata service for '%s'", containerId));
+		metaService = new MetadataService(appConfig);
+		metaService.start();
+		
+		executor = Executors.newSingleThreadScheduledExecutor();
+		executor.scheduleAtFixedRate(new ContainerService(), 0, CONTAINERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+	}
+
+	public void stopService() {
+		log.info(String.format("stop metadata service for '%s'", containerId));
+		if (metaService != null) {
+			metaService.stop();
+			metaService = null;
+		}
+		
+		if(executor != null) {
+			executor.shutdown();
+		}
+	}
+	
+	public boolean isRunning() {
+		if(executor == null)
+			return false;
+		return !executor.isTerminated();
+	}
+	
+	public void startParticipant() throws Exception {
+		log.info("STARTING " + containerId);
+		participantManager = HelixManagerFactory.getZKHelixManager(appConfig.clusterName,
+				containerId, InstanceType.PARTICIPANT, appConfig.clusterAddress);
+		participantManager.getStateMachineEngine().registerStateModelFactory(
+				"MasterSlave", new ManagedFactory());
+		participantManager.connect();
+		log.info("STARTED " + containerId);
+	}
+
+	public void stopParticipant() {
+		if (participantManager != null) {
+			participantManager.disconnect();
+			participantManager = null;
+		}
+	}
+	
+	public void updateContainerStatus() {
+		log.info("updating container status");
+		try {
+			ContainerMetadata meta = metaService.read(containerId);
+			
+			if(meta.state == ContainerState.CONNECTING) {
+				log.info("container connecting, going to active");
+				try {
+					startParticipant();
+					metaService.update(new ContainerMetadata(meta, ContainerState.ACTIVE));
+				} catch (Exception e) {
+					log.error("Failed to start participant, going to failed", e);
+					stopParticipant();
+					metaService.update(new ContainerMetadata(meta, ContainerState.FAILED));
+				}
+			}
+			
+			if(meta.state == ContainerState.ACTIVE) {
+				// do something
+				// and go to failed on error
+			}
+			
+			if(meta.state == ContainerState.TEARDOWN) {
+				log.info("container teardown, going to halted");
+				stopParticipant();
+				metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+				stopService();
+			}
+			
+		} catch(Exception e) {
+			log.warn(String.format("Container '%s' does not exist, stopping service", containerId));
+			stopService();
+		}
+	}
+	
+	class ContainerService implements Runnable {
+		@Override
+		public void run() {
+			updateContainerStatus();
+		}
+	}
+
+  public static void main(String[] args) throws Exception
+  {
+	log.trace("BEGIN YarnProcess.main()");
+	  
+    final String clusterAddress = args[0];
+    final String clusterName = args[1];
+    final String providerAddress = args[2];
+    final String providerName = args[3];
+    final String containerId = args[4];
+
+    final ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, providerAddress, providerName);
+    
+    final YarnProcess yarnProcess = new YarnProcess(appConfig, containerId);
+
+    yarnProcess.startService();
+    
+    Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+		@Override
+		public void run() {
+			yarnProcess.stopService();
+		}
+	}));
+    
+	while(yarnProcess.isRunning()) {
+		try {
+			Thread.sleep(100);
+		} catch (InterruptedException e) {
+			// ignore
+		}
+	}
+	
+	log.trace("END YarnProcess.main()");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2local.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2local.properties b/recipes/meta-cluster-manager/src/main/resources/2by2local.properties
new file mode 100644
index 0000000..ac7968a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2local.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=local
+
+meta.provider.type=local
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties b/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties
new file mode 100644
index 0000000..3971375
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=local
+
+meta.provider.type=local
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyOnlineOfflineProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=OnlineOffline
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties b/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties
new file mode 100644
index 0000000..a26f250
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=shell
+
+meta.provider.type=shell
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties b/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties
new file mode 100644
index 0000000..6afd2c6
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties
@@ -0,0 +1,58 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=rm:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=yarn
+meta.status.metadata=rm:2199
+
+meta.provider.type=yarn
+meta.provider.name=provider_0
+meta.provider.address=rm:2199
+meta.provider.cluster=managed
+meta.provider.metadata=rm:2199
+meta.provider.resourcemananger=rm:8032
+meta.provider.scheduler=rm:8030
+meta.provider.user=yarn
+meta.provider.hdfs=hdfs://rm:9000/
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=rm:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties b/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties
new file mode 100644
index 0000000..66f3637
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties
@@ -0,0 +1,58 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=rm:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=yarn
+meta.status.metadata=rm:2199
+
+meta.provider.type=yarn
+meta.provider.name=provider_0
+meta.provider.address=rm:2199
+meta.provider.cluster=managed
+meta.provider.metadata=rm:2199
+meta.provider.resourcemananger=rm:8032
+meta.provider.scheduler=rm:8030
+meta.provider.user=yarn
+meta.provider.hdfs=hdfs://rm:9000/
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.ZookeeperMasterSlaveProcess
+meta.provider.container.database.address=rm:2199
+meta.provider.container.database.root=mydatabase
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.ZookeeperMasterSlaveProcess
+meta.provider.container.webserver.address=rm:2199
+meta.provider.container.webserver.root=mywebserver
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=rm:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties b/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties
new file mode 100644
index 0000000..b719620
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=local
+
+meta.provider.type=local
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.container.impl.DummyProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.container.impl.DummyProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1

http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties b/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties
new file mode 100644
index 0000000..4eb07bd
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties
@@ -0,0 +1,87 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Cluster
+#
+cluster.name=cluster
+cluster.address=localhost:2199
+
+#
+# Resource
+#
+resource.0.name=wsprod
+resource.0.cluster=cluster
+resource.0.address=localhost:2199
+resource.0.container=webserver
+resource.0.model=MasterSlave
+resource.0.partitions=15
+resource.0.replica=1
+
+resource.1.name=dbprod
+resource.1.cluster=cluster
+resource.1.address=localhost:2199
+resource.1.container=database
+resource.1.model=MasterSlave
+resource.1.partitions=8
+resource.1.replica=3
+
+#
+# Controller
+#
+controller.name=controller
+controller.cluster=cluster
+controller.address=localhost:2199
+controller.autorefresh=5000
+
+#
+# Metacluster
+#
+metacluster.name=meta
+metacluster.address=localhost:2199
+metacluster.managedcluster=cluster
+metacluster.managedaddress=localhost:2199
+
+#
+# Metaresource
+#
+metaresource.0.name=webserver
+metaresource.0.metacluster=meta
+metaresource.0.metaaddress=localhost:2199
+metaresource.0.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+
+metaresource.1.name=database
+metaresource.1.metacluster=meta
+metaresource.1.metaaddress=localhost:2199
+metaresource.1.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+
+#
+# Metaprovider
+#
+metaprovider.0.name=provider0
+metaprovider.0.metacluster=meta
+metaprovider.0.metaaddress=localhost:2199
+metaprovider.0.class=org.apache.helix.metamanager.impl.local.LocalContainerProviderProcess
+
+metaprovider.1.name=provider1
+metaprovider.1.metacluster=meta
+metaprovider.1.metaaddress=localhost:2199
+metaprovider.1.class=org.apache.helix.metamanager.impl.local.LocalContainerProviderProcess
+
+#
+# Metacontroller
+#
+metacontroller.name=metacontroller
+metacontroller.metacluster=meta
+metacontroller.metaaddress=localhost:2199
+metacontroller.autorefresh=5000
+
+metacontroller.status.class=org.apache.helix.metamanager.impl.local.LocalStatusProvider
+
+metacontroller.target.class=org.apache.helix.metamanager.impl.StaticTargetProvider
+metacontroller.target.webserver=5
+metacontroller.target.database=3