You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2013/09/20 20:30:12 UTC
[03/15] Adding Helix-task-framework and Yarn integration modules
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java
new file mode 100644
index 0000000..b63be1f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnMasterService.java
@@ -0,0 +1,361 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.provider.yarn.MetadataService.MetadataServiceException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+public class YarnMasterService {
+
+ static final Logger log = Logger.getLogger(YarnMasterService.class);
+
+ static final String REQUIRED_TYPE = "container";
+
+ static final long ZOOKEEPER_TIMEOUT = 5000;
+
+ static final long MASTERSERVICE_INTERVAL = 1000;
+
+ static final String CONTAINERS = "CONTAINERS";
+
+ static final String CONTAINER_COMMAND = "/bin/sh %s %s %s %s %s %s 1>%s/stdout 2>%s/stderr";
+
+ /*
+ * CONTAINERS
+ * A (A, READY)
+ * B (B, RUNNING)
+ */
+
+ final ApplicationConfig appConfig;
+ final AMRMProtocol yarnClient;
+ final ApplicationAttemptId appAtemptId;
+
+ final Configuration yarnConfig;
+
+ final File dummy = new File("/tmp/dummy");
+
+ final Map<ContainerId, Container> unassignedContainers = new HashMap<ContainerId, Container>();
+ final Map<ContainerId, Container> activeContainers = new HashMap<ContainerId, Container>();
+ final Map<ContainerId, ContainerStatus> completedContainers = new HashMap<ContainerId, ContainerStatus>();
+ final Map<ContainerId, String> yarn2meta = new HashMap<ContainerId, String>();
+
+ final MetadataService metaService;
+
+ ScheduledExecutorService executor;
+
+ public YarnMasterService(AMRMProtocol yarnClient, Configuration conf, ApplicationAttemptId appAttemptId, ApplicationConfig appConfig, MetadataService metaService) {
+ this.appConfig = appConfig;
+ this.yarnClient = yarnClient;
+ this.appAtemptId = appAttemptId;
+ this.yarnConfig = conf;
+ this.metaService = metaService;
+ }
+
+ public void startService() {
+ log.debug("starting yarn master service");
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new YarnService(), 0, MASTERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopService() {
+ log.debug("stopping yarn master service");
+
+ if(executor != null) {
+ executor.shutdown();
+ while(!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+ }
+
+ Collection<ContainerMetadata> readOwnedMetadata() throws MetadataServiceException {
+ log.debug("reading container data");
+
+ Collection<ContainerMetadata> containers = new ArrayList<ContainerMetadata>();
+ for(ContainerMetadata meta : metaService.readAll()) {
+ if(meta.owner.equals(appConfig.providerName)) {
+ containers.add(meta);
+ log.debug(String.format("found container node '%s' (state=%s, yarnId=%s, command=%s, owner=%s)",
+ meta.id, meta.state, meta.yarnId, meta.command, meta.owner));
+ }
+ }
+ return containers;
+ }
+
+ class YarnService implements Runnable {
+ int responseId = 0;
+
+ @Override
+ public void run() {
+ try {
+ log.debug("running yarn service update cycle");
+
+ Collection<ContainerMetadata> metadata = readOwnedMetadata();
+
+ // active meta containers
+ int numMetaActive = countActiveMeta(metadata);
+
+ // newly acquired meta containers
+ int numMetaAcquire = countAcquireMeta(metadata);
+
+ // destroyed meta containers
+ List<ContainerId> destroyedReleasedIds = createDestroyedReleaseList(metadata);
+ int numMetaCompleted = destroyedReleasedIds.size();
+
+ int numMeta = numMetaAcquire + numMetaActive + numMetaCompleted;
+
+ // yarn containers
+ int numYarnUnassigned = unassignedContainers.size();
+ int numYarnActive = activeContainers.size();
+ int numYarnCompleted = completedContainers.size();
+ int numYarn = numYarnUnassigned + numYarnActive + numYarnCompleted;
+
+ int numYarnRequired = numMetaAcquire - numYarnUnassigned;
+
+ // additionally required containers
+ int numRequestAdditional = Math.max(0, numYarnRequired);
+
+ // overstock containers
+ List<ContainerId> unneededReleasedIds = createOverstockReleaseList(numYarnRequired);
+
+ int numReleased = destroyedReleasedIds.size() + unneededReleasedIds.size();
+
+ log.debug(String.format("meta containers (total=%d, acquire=%d, active=%d, completed=%d)", numMeta, numMetaAcquire, numMetaActive, numMetaCompleted));
+ log.debug(String.format("yarn containers (total=%d, unassigned=%d, active=%d, completed=%d)", numYarn, numYarnUnassigned, numYarnActive, numYarnCompleted));
+ log.debug(String.format("requesting %d new containers, releasing %d", numRequestAdditional, numReleased));
+
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(256); // TODO make dynamic
+
+ ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+ resourceRequest.setHostName("*");
+ resourceRequest.setNumContainers(numRequestAdditional);
+ resourceRequest.setPriority(priority);
+ resourceRequest.setCapability(resource);
+
+ AllocateRequest request = Records.newRecord(AllocateRequest.class);
+ request.setResponseId(responseId);
+ request.setApplicationAttemptId(appAtemptId);
+ request.addAsk(resourceRequest);
+ request.addAllReleases(destroyedReleasedIds);
+ request.addAllReleases(unneededReleasedIds);
+
+ responseId++;
+
+ AllocateResponse allocateResponse = null;
+ try {
+ allocateResponse = yarnClient.allocate(request);
+ } catch (YarnRemoteException e) {
+ // ignore
+ log.error("Error allocating containers", e);
+ return;
+ }
+
+ AMResponse response = allocateResponse.getAMResponse();
+
+ // newly added containers
+ for(Container container : response.getAllocatedContainers()) {
+ unassignedContainers.put(container.getId(), container);
+ }
+
+ log.info(String.format("%d new containers available, %d required", unassignedContainers.size(), numMetaAcquire));
+
+ Iterator<Container> itYarn = unassignedContainers.values().iterator();
+ Iterator<ContainerMetadata> itMeta = metadata.iterator();
+ while(itYarn.hasNext() && itMeta.hasNext()) {
+ ContainerMetadata meta = itMeta.next();
+
+ if(meta.yarnId >= 0)
+ continue;
+
+ Container containerYarn = itYarn.next();
+
+ log.debug(String.format("assigning yarn container '%s' to container node '%s'", containerYarn.getId(), meta.id));
+
+ String command = String.format(CONTAINER_COMMAND, meta.command,
+ appConfig.clusterAddress, appConfig.clusterName, appConfig.metadataAddress, appConfig.providerName,
+ meta.id, "/tmp/" + meta.id, "/tmp/" + meta.id);
+ //ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+ ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+ context.setContainerId(containerYarn.getId());
+ context.setResource(containerYarn.getResource());
+ context.setEnvironment(Maps.<String, String>newHashMap());
+ context.setCommands(Collections.singletonList(command));
+ context.setLocalResources(Utils.getDummyResources());
+ try {
+ context.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+ } catch (IOException e) {
+ log.error(String.format("failed setting up container '%s' user information", meta.id));
+ return;
+ }
+
+ log.debug(String.format("container '%s' executing command '%s'", meta.id, command));
+
+ StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+ startReq.setContainerLaunchContext(context);
+
+ try {
+ getContainerManager(containerYarn).startContainer(startReq);
+
+ } catch (YarnRemoteException e) {
+ log.error(String.format("Error starting container '%s'", meta.id), e);
+ return;
+ }
+
+ log.debug(String.format("container '%s' started, updating container node", meta.id));
+
+ metaService.update(new ContainerMetadata(meta, ContainerState.CONNECTING, containerYarn.getId().getId()));
+ yarn2meta.put(containerYarn.getId(), meta.id);
+
+ log.debug(String.format("removing '%s' from unassigned yarn containers and adding to active list", containerYarn.getId()));
+
+ itYarn.remove();
+ activeContainers.put(containerYarn.getId(), containerYarn);
+
+ }
+
+ for(ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ContainerId id = status.getContainerId();
+
+ log.info(String.format("Container '%s' completed", id));
+
+ if(unassignedContainers.containsKey(id)) {
+ log.info(String.format("Unassigned container '%s' terminated, removing", id));
+ unassignedContainers.remove(id);
+ // TODO handle
+ }
+
+ if(activeContainers.containsKey(id)) {
+ log.info(String.format("Active container '%s' terminated, removing", id));
+ activeContainers.remove(id);
+
+ String metaId = yarn2meta.get(id);
+ ContainerMetadata meta = metaService.read(metaId);
+
+ log.debug(String.format("container '%s' finalized, updating container node", meta.id));
+
+ metaService.update(new ContainerMetadata(meta, ContainerState.FINALIZE));
+ }
+
+ completedContainers.put(id, status);
+ }
+
+ log.debug("yarn service update cycle complete");
+
+ } catch (Exception e) {
+ log.error("Error while executing yarn update cycle", e);
+ }
+ }
+
+ private List<ContainerId> createOverstockReleaseList(int numYarnRequired) {
+ List<ContainerId> unneededReleasedIds = new ArrayList<ContainerId>();
+ Iterator<Container> itUnassigned = unassignedContainers.values().iterator();
+ if(numYarnRequired < 0) {
+ for(int i=0; i<-numYarnRequired && itUnassigned.hasNext(); i++) {
+ Container container = itUnassigned.next();
+ unneededReleasedIds.add(container.getId());
+ log.debug(String.format("Container '%s' no longer required, removing", container.getId()));
+ itUnassigned.remove();
+ }
+ }
+ return unneededReleasedIds;
+ }
+
+ private List<ContainerId> createDestroyedReleaseList(
+ Collection<ContainerMetadata> metadata) {
+ List<ContainerId> releasedIds = new ArrayList<ContainerId>();
+ for(ContainerMetadata meta : metadata) {
+ if(meta.state == ContainerState.HALTED) {
+ ContainerId containerId = Records.newRecord(ContainerId.class);
+ containerId.setApplicationAttemptId(appAtemptId);
+ containerId.setId(meta.yarnId);
+ releasedIds.add(containerId);
+ log.debug(String.format("releasing container '%s'", containerId));
+ }
+ }
+ return releasedIds;
+ }
+
+ private int countAcquireMeta(Collection<ContainerMetadata> metadata) {
+ int numMetaAcquire = 0;
+ for(ContainerMetadata meta : metadata) {
+ if(meta.state == ContainerState.ACQUIRE) {
+ numMetaAcquire++;
+ }
+ }
+ return numMetaAcquire;
+ }
+
+ private int countActiveMeta(Collection<ContainerMetadata> metadata) {
+ int numMetaActive = 0;
+ for(ContainerMetadata meta : metadata) {
+ if(meta.state != ContainerState.ACQUIRE &&
+ meta.state != ContainerState.HALTED &&
+ meta.state != ContainerState.FINALIZE) {
+ numMetaActive++;
+ }
+ }
+ return numMetaActive;
+ }
+ }
+
+ private ContainerManager getContainerManager(Container container) {
+ YarnConfiguration yarnConf = new YarnConfiguration(yarnConfig);
+ YarnRPC rpc = YarnRPC.create(yarnConf);
+ NodeId nodeId = container.getNodeId();
+ String containerIpPort = String.format("%s:%d", nodeId.getHost(),
+ nodeId.getPort());
+ log.info("Connecting to ContainerManager at: " + containerIpPort);
+ InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
+ ContainerManager cm = (ContainerManager) rpc.getProxy(
+ ContainerManager.class, addr, yarnConfig);
+ return cm;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java
new file mode 100644
index 0000000..b1a22d5
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/YarnProcess.java
@@ -0,0 +1,171 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.managed.ManagedFactory;
+import org.apache.helix.metamanager.provider.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnProcess {
+ static final Logger log = Logger.getLogger(YarnProcess.class);
+
+ static final long CONTAINERSERVICE_INTERVAL = 1000;
+
+ final ApplicationConfig appConfig;
+ final String containerId;
+
+ HelixManager participantManager;
+
+ MetadataService metaService;
+ ScheduledExecutorService executor;
+
+
+ public YarnProcess(ApplicationConfig appConfig, String containerId) {
+ this.appConfig = appConfig;
+ this.containerId = containerId;
+ }
+
+ public void startService() {
+ log.info(String.format("start metadata service for '%s'", containerId));
+ metaService = new MetadataService(appConfig.metadataAddress);
+ metaService.start();
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new ContainerService(), 0, CONTAINERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopService() {
+ log.info(String.format("stop metadata service for '%s'", containerId));
+ if (metaService != null) {
+ metaService.stop();
+ metaService = null;
+ }
+
+ if(executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ public boolean isRunning() {
+ if(executor == null)
+ return false;
+ return !executor.isTerminated();
+ }
+
+ public void startParticipant() throws Exception {
+ log.info("STARTING " + containerId);
+ participantManager = HelixManagerFactory.getZKHelixManager(appConfig.clusterName,
+ containerId, InstanceType.PARTICIPANT, appConfig.clusterAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(
+ "MasterSlave", new ManagedFactory());
+ participantManager.connect();
+ log.info("STARTED " + containerId);
+ }
+
+ public void stopParticipant() {
+ if (participantManager != null) {
+ participantManager.disconnect();
+ participantManager = null;
+ }
+ }
+
+ public void updateContainerStatus() {
+ log.info("updating container status");
+ try {
+ ContainerMetadata meta = metaService.read(containerId);
+
+ if(meta.state == ContainerState.CONNECTING) {
+ log.info("container connecting, going to active");
+ try {
+ startParticipant();
+ metaService.update(new ContainerMetadata(meta, ContainerState.ACTIVE));
+ } catch (Exception e) {
+ log.error("Failed to start participant, going to failed", e);
+ stopParticipant();
+ metaService.update(new ContainerMetadata(meta, ContainerState.FAILED));
+ }
+ }
+
+ if(meta.state == ContainerState.ACTIVE) {
+ // do something
+ // and go to failed on error
+ }
+
+ if(meta.state == ContainerState.TEARDOWN) {
+ log.info("container teardown, going to halted");
+ stopParticipant();
+ metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+ stopService();
+ }
+
+ } catch(Exception e) {
+ log.warn(String.format("Container '%s' does not exist, stopping service", containerId));
+ stopService();
+ }
+ }
+
+ class ContainerService implements Runnable {
+ @Override
+ public void run() {
+ updateContainerStatus();
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ log.trace("BEGIN YarnProcess.main()");
+
+ final String clusterAddress = args[0];
+ final String clusterName = args[1];
+ final String metadataAddress = args[2];
+ final String providerName = args[3];
+ final String containerId = args[4];
+
+ final ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, metadataAddress, providerName);
+
+ final YarnProcess yarnProcess = new YarnProcess(appConfig, containerId);
+
+ yarnProcess.startService();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ yarnProcess.stopService();
+ }
+ }));
+
+ while(yarnProcess.isRunning()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ log.trace("END YarnProcess.main()");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java
new file mode 100644
index 0000000..00bf17f
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/provider/yarn/ZookeeperMetadataService.java
@@ -0,0 +1,102 @@
+package org.apache.helix.metamanager.provider.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.log4j.Logger;
+
+public class ZookeeperMetadataService implements MetadataService {
+
+ static final Logger log = Logger.getLogger(ZookeeperMetadataService.class);
+
+ static final String CONTAINER_NAMESPACE = "containers";
+
+ static final String BASE_PATH = "/" + CONTAINER_NAMESPACE;
+
+ static final long POLL_INTERVAL = 100;
+
+ final String metadataAddress;
+
+ ZkClient client;
+
+ public ZookeeperMetadataService(String metadataAddress) {
+ this.metadataAddress = metadataAddress;
+ }
+
+ public void startService() {
+ log.debug(String.format("starting metadata service for '%s'", metadataAddress));
+
+ client = new ZkClient(metadataAddress);
+
+ client.createPersistent(BASE_PATH, true);
+ }
+
+ public void stopService() {
+ log.debug(String.format("stopping metadata service for '%s'", metadataAddress));
+ if(client != null) {
+ client.close();
+ client = null;
+ }
+ }
+
+ @Override
+ public boolean exists(String id) {
+ return client.exists(makePath(id));
+ }
+
+ @Override
+ public void create(ContainerMetadata meta) throws MetadataServiceException {
+ try {
+ client.createPersistent(makePath(meta.id), Utils.toJson(meta));
+ } catch (ZkException e) {
+ throw new MetadataServiceException(e);
+ }
+ }
+
+ @Override
+ public ContainerMetadata read(String id) throws MetadataServiceException {
+ try {
+ return Utils.fromJson(client.<String>readData(makePath(id)));
+ } catch (ZkException e) {
+ throw new MetadataServiceException(e);
+ }
+ }
+
+ @Override
+ public Collection<ContainerMetadata> readAll() throws MetadataServiceException {
+ try {
+ Collection<ContainerMetadata> metadata = new ArrayList<ContainerMetadata>();
+ for(String id : client.getChildren(BASE_PATH)) {
+ metadata.add(Utils.fromJson(client.<String>readData(makePath(id))));
+ }
+ return metadata;
+ } catch (ZkException e) {
+ throw new MetadataServiceException(e);
+ }
+ }
+
+ @Override
+ public void update(ContainerMetadata meta) throws MetadataServiceException {
+ try {
+ client.writeData(makePath(meta.id), Utils.toJson(meta));
+ } catch (ZkException e) {
+ throw new MetadataServiceException(e);
+ }
+ }
+
+ @Override
+ public void delete(String id) throws MetadataServiceException {
+ try {
+ client.delete(makePath(id));
+ } catch (ZkException e) {
+ throw new MetadataServiceException(e);
+ }
+ }
+
+ String makePath(String containerId) {
+ return BASE_PATH + "/" + containerId;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java
new file mode 100644
index 0000000..5950d42
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ApplicationConfig.java
@@ -0,0 +1,32 @@
+package org.apache.helix.metamanager.yarn;
+
+public class ApplicationConfig {
+ final String clusterAddress;
+ final String clusterName;
+ final String providerAddress;
+ final String providerName;
+
+ public ApplicationConfig(String clusterAddress, String clusterName,
+ String providerAddress, String providerName) {
+ this.clusterAddress = clusterAddress;
+ this.clusterName = clusterName;
+ this.providerAddress = providerAddress;
+ this.providerName = providerName;
+ }
+
+ public String getClusterAddress() {
+ return clusterAddress;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public String getProviderAddress() {
+ return providerAddress;
+ }
+
+ public String getProviderName() {
+ return providerName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java
new file mode 100644
index 0000000..1245080
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerMetadata.java
@@ -0,0 +1,50 @@
+package org.apache.helix.metamanager.yarn;
+
+
+class ContainerMetadata {
+
+ static enum ContainerState {
+ ACQUIRE,
+ CONNECTING,
+ ACTIVE,
+ TEARDOWN,
+ FAILED,
+ HALTED,
+ FINALIZE
+ }
+
+ String id;
+ ContainerState state;
+ int yarnId;
+ String command;
+ String owner;
+
+ public ContainerMetadata() {
+ // left blank
+ }
+
+ public ContainerMetadata(String id, String command, String owner) {
+ this.id = id;
+ this.state = ContainerState.ACQUIRE;
+ this.yarnId = -1;
+ this.command = command;
+ this.owner = owner;
+ }
+
+ public ContainerMetadata(ContainerMetadata node, ContainerState state) {
+ this.id = node.id;
+ this.state = state;
+ this.yarnId = node.yarnId;
+ this.command = node.command;
+ this.owner = node.owner;
+ }
+
+ public ContainerMetadata(ContainerMetadata node, ContainerState state, int yarnId) {
+ this.id = node.id;
+ this.state = state;
+ this.yarnId = yarnId;
+ this.command = node.command;
+ this.owner = node.owner;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java
new file mode 100644
index 0000000..59b9325
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/ContainerNode.java
@@ -0,0 +1,61 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.io.Serializable;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+
+class ContainerNode implements Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 2578978959080378923L;
+
+ static enum ContainerState {
+ ACQUIRE,
+ CONNECT,
+ READY,
+ STARTING,
+ RUNNING,
+ STOPPING,
+ TEARDOWN,
+ FINALIZE
+ }
+
+ final String id;
+ final ContainerState state;
+ final ContainerId yarnId;
+
+ final String zkAddress;
+ final String clusterName;
+ final String command;
+
+ public ContainerNode(String id, String zkAddress, String clusterName, String command) {
+ this.id = id;
+ this.state = ContainerState.ACQUIRE;
+ this.yarnId = null;
+ this.zkAddress = zkAddress;
+ this.clusterName = clusterName;
+ this.command = command;
+ }
+
+ public ContainerNode(ContainerNode node, ContainerState state) {
+ this.id = node.id;
+ this.state = state;
+ this.yarnId = node.yarnId;
+ this.zkAddress = node.zkAddress;
+ this.clusterName = node.clusterName;
+ this.command = node.command;
+ }
+
+ public ContainerNode(ContainerNode node, ContainerState state, ContainerId yarnId) {
+ this.id = node.id;
+ this.state = state;
+ this.yarnId = yarnId;
+ this.zkAddress = node.zkAddress;
+ this.clusterName = node.clusterName;
+ this.command = node.command;
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java
new file mode 100644
index 0000000..ba5be81
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MessageNode.java
@@ -0,0 +1,20 @@
+package org.apache.helix.metamanager.yarn;
+
+
+class MessageNode {
+ static enum MessageType {
+ CREATE,
+ START,
+ STOP,
+ DESTROY
+ }
+
+ final String id;
+ final MessageType type;
+
+ public MessageNode(String id, MessageType type) {
+ this.id = id;
+ this.type = type;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java
new file mode 100644
index 0000000..be88826
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/MetadataService.java
@@ -0,0 +1,146 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.TimeoutException;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkException;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class MetadataService {
+
+ static final Logger log = Logger.getLogger(MetadataService.class);
+
+ static final String CONTAINER_NAMESPACE = "containers";
+
+// static final String LOCK_PATH = "/" + CONTAINER_NAMESPACE + "/lock";
+ static final long POLL_INTERVAL = 100;
+
+ final ApplicationConfig appConfig;
+
+ ZkClient client;
+ String basePath;
+
+ public MetadataService(ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ }
+
+ public void start() {
+ basePath = "/" + CONTAINER_NAMESPACE;
+ log.debug(String.format("starting metadata service for '%s/%s'", appConfig.providerAddress, appConfig.providerName));
+
+ client = new ZkClient(appConfig.providerAddress);
+
+ client.createPersistent(basePath, true);
+ }
+
+ public void stop() {
+ log.debug(String.format("stopping metadata service for '%s/%s'", appConfig.providerAddress, appConfig.providerName));
+ if(client != null) {
+ client.close();
+ client = null;
+ }
+ }
+
+// public void lock(long timeout) throws Exception {
+// long limit = System.currentTimeMillis() + timeout;
+// while (limit > System.currentTimeMillis()) {
+// try {
+// client.createEphemeral(LOCK_PATH);
+// return;
+// } catch (Exception ignore) {}
+// Thread.sleep(POLL_INTERVAL);
+// }
+// throw new IllegalStateException("Could not acquire lock");
+// }
+//
+// public void unlock() {
+// client.delete(LOCK_PATH);
+// }
+
+ public void create(ContainerMetadata meta) throws IllegalMetadataStateException {
+ try {
+ client.createPersistent(makePath(meta.id), Utils.toJson(meta));
+ } catch (ZkException e) {
+ throw new IllegalMetadataStateException(e);
+ }
+ }
+
+ public ContainerMetadata read(String id) throws IllegalMetadataStateException {
+ try {
+ return Utils.fromJson(client.<String>readData(makePath(id)));
+ } catch (ZkException e) {
+ throw new IllegalMetadataStateException(e);
+ }
+ }
+
+ public Collection<ContainerMetadata> readAll() throws IllegalMetadataStateException {
+ try {
+ Collection<ContainerMetadata> metadata = new ArrayList<ContainerMetadata>();
+ for(String id : client.getChildren(basePath)) {
+ metadata.add(Utils.fromJson(client.<String>readData(makePath(id))));
+ }
+ return metadata;
+ } catch (ZkException e) {
+ throw new IllegalMetadataStateException(e);
+ }
+ }
+
+ public void update(ContainerMetadata meta) throws IllegalMetadataStateException {
+ try {
+ client.writeData(makePath(meta.id), Utils.toJson(meta));
+ } catch (ZkException e) {
+ throw new IllegalMetadataStateException(e);
+ }
+ }
+
+ public void delete(String id) throws IllegalMetadataStateException {
+ try {
+ client.delete(makePath(id));
+ } catch (ZkException e) {
+ throw new IllegalMetadataStateException(e);
+ }
+ }
+
+ public void waitForState(String id, ContainerState state, long timeout) throws IllegalMetadataStateException, InterruptedException, TimeoutException {
+ long limit = System.currentTimeMillis() + timeout;
+ ContainerMetadata meta = read(id);
+ while(meta.state != state) {
+ if(System.currentTimeMillis() >= limit) {
+ throw new TimeoutException(String.format("Container '%s' failed to reach state '%s' (currently is '%s')", id, state, meta.state));
+ }
+ Thread.sleep(POLL_INTERVAL);
+ meta = read(id);
+ }
+ }
+
+ String makePath(String containerId) {
+ return basePath + "/" + containerId;
+ }
+
+ public static class IllegalMetadataStateException extends Exception {
+
+ /**
+ *
+ */
+ private static final long serialVersionUID = -2846997013918977056L;
+
+ public IllegalMetadataStateException() {
+ super();
+ }
+
+ public IllegalMetadataStateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IllegalMetadataStateException(String message) {
+ super(message);
+ }
+
+ public IllegalMetadataStateException(Throwable cause) {
+ super(cause);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java
new file mode 100644
index 0000000..49f70d3
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/Utils.java
@@ -0,0 +1,93 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonToken;
+import com.google.gson.stream.JsonWriter;
+
+public class Utils {
+
+ static final Logger log = Logger.getLogger(Utils.class);
+
+ static Gson gson;
+ static {
+ GsonBuilder builder = new GsonBuilder();
+ builder.registerTypeAdapter(ContainerState.class, new ContainerStateAdapter());
+ builder.setPrettyPrinting();
+ gson = builder.create();
+ }
+ static Map<String, LocalResource> dummyResources = createDummyResources();
+
+ static String toJson(ContainerMetadata meta) {
+ return gson.toJson(meta);
+ }
+
+ static ContainerMetadata fromJson(String json) {
+ return gson.fromJson(json, ContainerMetadata.class);
+ }
+
+ static Map<String, LocalResource> getDummyResources() {
+ return dummyResources;
+ }
+
+ private static Map<String, LocalResource> createDummyResources() {
+ File dummy = new File("/tmp/dummy");
+
+ if(!dummy.exists()) {
+ try {
+ dummy.createNewFile();
+ } catch(Exception e) {
+ log.error("could not create dummy file", e);
+ System.exit(1);
+ }
+ }
+
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ Path path = new Path(dummy.toURI());
+ LocalResource localResource = Records.newRecord(LocalResource.class);
+ localResource.setType(LocalResourceType.FILE);
+ localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+ localResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
+ localResource.setTimestamp(dummy.lastModified());
+ localResource.setSize(dummy.length());
+ localResources.put("dummy", localResource);
+ return localResources;
+ }
+
+ static class ContainerStateAdapter extends TypeAdapter<ContainerState> {
+ @Override
+ public ContainerState read(JsonReader reader) throws IOException {
+ if (reader.peek() == JsonToken.NULL) {
+ reader.nextNull();
+ return null;
+ }
+ return ContainerState.valueOf(reader.nextString());
+ }
+
+ @Override
+ public void write(JsonWriter writer, ContainerState value) throws IOException {
+ if (value == null) {
+ writer.nullValue();
+ return;
+ }
+ writer.value(value.name());
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java
new file mode 100644
index 0000000..7d2099a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnApplication.java
@@ -0,0 +1,126 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+import com.google.common.base.Preconditions;
+
+public class YarnApplication {
+
+ static final Logger log = Logger.getLogger(YarnApplication.class);
+
+ static final String ENV_CLUSTER_ADDRESS = "CLUSTER_ADDRESS";
+ static final String ENV_CLUSTER_NAME = "CLUSTER_NAME";
+ static final String ENV_PROVIDER_ADDRESS = "PROVIDER_ADDRESS";
+ static final String ENV_PROVIDER_NAME = "PROVIDER_NAME";
+
+ static final String MASTER_COMMAND = "metamanager.master.command";
+ static final String DEFAULT_MASTER_COMMAND = "/bin/sh /home/apucher/incubator-helix/recipes/meta-cluster-manager/target/meta-cluster-manager-pkg/bin/yarn-master-process.sh 1>%s/stdout 2>%s/stderr";
+
+ Configuration conf;
+ YarnRPC rpc;
+ ClientRMProtocol rmClient;
+ ApplicationId appId;
+
+ final ApplicationConfig appConfig;
+
+ public YarnApplication(ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ configure(new YarnConfiguration());
+ }
+
+ public void start() throws Exception {
+ connect();
+
+ String command = String.format(conf.get(MASTER_COMMAND, DEFAULT_MASTER_COMMAND), "/tmp/" + appConfig.providerName, "/tmp/" + appConfig.providerName);
+ //ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+ log.info(String.format("Starting application '%s/%s' (masterCommand='%s')", appConfig.providerAddress, appConfig.providerName, command));
+
+ // app id
+ GetNewApplicationRequest appRequest = Records.newRecord(GetNewApplicationRequest.class);
+ GetNewApplicationResponse appResponse = rmClient.getNewApplication(appRequest);
+
+ this.appId = appResponse.getApplicationId();
+
+ log.info(String.format("Acquired app id '%s' for '%s/%s'", appId.toString(), appConfig.providerAddress, appConfig.providerName));
+
+ // command
+ ContainerLaunchContext launchContext = Records.newRecord(ContainerLaunchContext.class);
+ launchContext.setCommands(Collections.singletonList(command));
+
+ // resource limit
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(256); // TODO make dynamic
+ launchContext.setResource(resource);
+
+ // environment
+ Map<String, String> env = new HashMap<String, String>();
+ env.put(ENV_CLUSTER_ADDRESS, appConfig.clusterAddress);
+ env.put(ENV_CLUSTER_NAME, appConfig.clusterName);
+ env.put(ENV_PROVIDER_ADDRESS, appConfig.providerAddress);
+ env.put(ENV_PROVIDER_NAME, appConfig.providerName);
+ launchContext.setEnvironment(env);
+
+ // local resources
+ // YARN workaround: create dummy resource
+ Map<String, LocalResource> localResources = Utils.getDummyResources();
+ launchContext.setLocalResources(localResources);
+
+ // app submission
+ ApplicationSubmissionContext subContext = Records.newRecord(ApplicationSubmissionContext.class);
+ subContext.setApplicationId(appId);
+ subContext.setApplicationName(appConfig.providerName);
+ subContext.setAMContainerSpec(launchContext);
+
+ SubmitApplicationRequest subRequest = Records.newRecord(SubmitApplicationRequest.class);
+ subRequest.setApplicationSubmissionContext(subContext);
+
+ log.info(String.format("Starting app id '%s'", appId.toString()));
+
+ rmClient.submitApplication(subRequest);
+
+ }
+
+ public void stop() throws YarnRemoteException {
+ log.info(String.format("Stopping app id '%s'", appId.toString()));
+ KillApplicationRequest killRequest = Records.newRecord(KillApplicationRequest.class);
+ killRequest.setApplicationId(appId);
+
+ rmClient.forceKillApplication(killRequest);
+ }
+
+ void configure(Configuration conf) {
+ this.conf = Preconditions.checkNotNull(conf);
+ this.rpc = YarnRPC.create(conf);
+ }
+
+ void connect() {
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ InetSocketAddress rmAddress = NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS));
+ log.info("Connecting to ResourceManager at: " + rmAddress);
+ this.rmClient = ((ClientRMProtocol) rpc.getProxy(ClientRMProtocol.class, rmAddress, conf));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java
new file mode 100644
index 0000000..3447661
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnClient.java
@@ -0,0 +1,5 @@
+package org.apache.helix.metamanager.yarn;
+
+public class YarnClient {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java
new file mode 100644
index 0000000..d36eee9
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainer.java
@@ -0,0 +1,14 @@
+package org.apache.helix.metamanager.yarn;
+
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+
+public class YarnContainer extends Configured implements Tool {
+
+ @Override
+ public int run(String[] args) throws Exception {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java
new file mode 100644
index 0000000..34a6b61
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerProvider.java
@@ -0,0 +1,90 @@
+package org.apache.helix.metamanager.yarn;
+
+import org.apache.helix.metamanager.ClusterContainerProvider;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnContainerProvider implements ClusterContainerProvider {
+
+ static final Logger log = Logger.getLogger(YarnContainerProvider.class);
+
+ static final String REQUIRED_TYPE = "container";
+
+ static final long LOCK_TIMEOUT = 1000;
+ static final long CONTAINER_TIMEOUT = 10000;
+
+ /*
+ * CONTAINERS
+ * A (A, READY)
+ * B (B, RUNNING)
+ */
+
+ final ApplicationConfig appConfig;
+ final String command;
+
+ final Object notifier = new Object();
+
+ MetadataService metaService;
+
+ public YarnContainerProvider(ApplicationConfig appConfig, String command) {
+ this.appConfig = appConfig;
+ this.command = command;
+ }
+
+ @Override
+ public void create(final String id, final String type) throws Exception {
+ if(!REQUIRED_TYPE.equals(type)) {
+ throw new IllegalArgumentException(String.format("Type '%s' not supported", type));
+ }
+
+ metaService.create(new ContainerMetadata(id, command, appConfig.providerName));
+ metaService.waitForState(id, ContainerState.ACTIVE, CONTAINER_TIMEOUT);
+ }
+
+ @Override
+ public void destroy(final String id) throws Exception {
+ ContainerMetadata meta = metaService.read(id);
+
+ if(meta.state == ContainerState.ACTIVE) {
+ log.info(String.format("Destroying active container, going to teardown"));
+ metaService.update(new ContainerMetadata(meta, ContainerState.TEARDOWN));
+
+ } else if(meta.state == ContainerState.FAILED) {
+ log.info(String.format("Destroying failed container, going to halted"));
+ metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+
+ } else if(meta.state == ContainerState.FINALIZE) {
+ log.info(String.format("Destroying finalized container, skipping"));
+
+ } else {
+ throw new IllegalStateException(String.format("Container '%s' must be active, failed or finalized", id));
+ }
+
+ metaService.waitForState(id, ContainerState.FINALIZE, CONTAINER_TIMEOUT);
+ metaService.delete(id);
+ }
+
+ @Override
+ public void destroyAll() {
+ try {
+ for(ContainerMetadata meta : metaService.readAll()) {
+ try { destroy(meta.id); } catch (Exception ignore) {}
+ }
+ } catch (Exception ignore) {
+ // ignore
+ }
+ }
+
+ public void startService() {
+ metaService = new MetadataService(appConfig);
+ metaService.start();
+ }
+
+ public void stopService() {
+ if(metaService != null) {
+ metaService.stop();
+ metaService = null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java
new file mode 100644
index 0000000..855dddd
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnContainerService.java
@@ -0,0 +1,370 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.records.AMResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.helix.metamanager.yarn.MetadataService.IllegalMetadataStateException;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+public class YarnContainerService {
+
+ static final Logger log = Logger.getLogger(YarnContainerService.class);
+
+ static final String REQUIRED_TYPE = "container";
+
+ static final long ZOOKEEPER_TIMEOUT = 5000;
+
+ static final long YARNSERVICE_INTERVAL = 1000;
+
+ static final String CONTAINERS = "CONTAINERS";
+
+ static final String CONTAINER_COMMAND = "/bin/sh %s %s %s %s %s %s 1>%s/stdout 2>%s/stderr";
+
+ /*
+ * CONTAINERS
+ * A (A, READY)
+ * B (B, RUNNING)
+ */
+
+ final ApplicationConfig appConfig;
+ final AMRMProtocol yarnClient;
+ final ApplicationAttemptId appAtemptId;
+
+ final Configuration yarnConfig;
+
+ final File dummy = new File("/tmp/dummy");
+
+ final Map<ContainerId, Container> unassignedContainers = new HashMap<ContainerId, Container>();
+ final Map<ContainerId, Container> activeContainers = new HashMap<ContainerId, Container>();
+ final Map<ContainerId, ContainerStatus> completedContainers = new HashMap<ContainerId, ContainerStatus>();
+ final Map<ContainerId, String> yarn2meta = new HashMap<ContainerId, String>();
+
+ int numRequestedLast = 0;
+
+ MetadataService metaService;
+
+ ScheduledExecutorService executor;
+
+ public YarnContainerService(AMRMProtocol yarnClient, Configuration conf, ApplicationAttemptId appAttemptId, ApplicationConfig appConfig) {
+ this.appConfig = appConfig;
+ this.yarnClient = yarnClient;
+ this.appAtemptId = appAttemptId;
+ this.yarnConfig = conf;
+ }
+
+ public void startService() {
+ log.debug("starting container service");
+
+ metaService = new MetadataService(appConfig);
+ metaService.start();
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new YarnService(), 0, YARNSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopService() {
+ log.debug("stopping container service");
+
+ if(executor != null) {
+ executor.shutdown();
+ while(!executor.isTerminated()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ executor = null;
+ }
+
+ if(metaService != null) {
+ metaService.stop();
+ metaService = null;
+ }
+ }
+
+ Collection<ContainerMetadata> readOwnedMetadata() throws IllegalMetadataStateException {
+ log.debug("reading container data");
+
+ Collection<ContainerMetadata> containers = new ArrayList<ContainerMetadata>();
+ for(ContainerMetadata meta : metaService.readAll()) {
+ if(meta.owner.equals(appConfig.providerName)) {
+ containers.add(meta);
+ log.debug(String.format("found container node '%s' (state=%s, yarnId=%s, command=%s, owner=%s)",
+ meta.id, meta.state, meta.yarnId, meta.command, meta.owner));
+ }
+ }
+ return containers;
+ }
+
+ class YarnService implements Runnable {
+ int responseId = 0;
+
+ @Override
+ public void run() {
+ try {
+ log.debug("running yarn service update cycle");
+
+ Collection<ContainerMetadata> metadata = readOwnedMetadata();
+
+ // active meta containers
+ int numMetaActive = countActiveMeta(metadata);
+
+ // newly acquired meta containers
+ int numMetaAcquire = countAcquireMeta(metadata);
+
+ // destroyed meta containers
+ List<ContainerId> destroyedReleasedIds = createDestroyedReleaseList(metadata);
+ int numMetaCompleted = destroyedReleasedIds.size();
+
+ int numMeta = numMetaAcquire + numMetaActive + numMetaCompleted;
+
+ // yarn containers
+ int numYarnUnassigned = unassignedContainers.size();
+ int numYarnActive = activeContainers.size();
+ int numYarnCompleted = completedContainers.size();
+ int numYarn = numYarnUnassigned + numYarnActive + numYarnCompleted;
+
+ int numYarnRequired = numMetaAcquire - numYarnUnassigned;
+
+ // additionally required containers
+ int numRequestAdditional = Math.max(0, numYarnRequired);
+
+ // overstock containers
+ List<ContainerId> unneededReleasedIds = createOverstockReleaseList(numYarnRequired);
+
+ log.debug(String.format("meta containers (total=%d, acquire=%d, active=%d, completed=%d)", numMeta, numMetaAcquire, numMetaActive, numMetaCompleted));
+ log.debug(String.format("yarn containers (total=%d, unassigned=%d, active=%d, completed=%d)", numYarn, numYarnUnassigned, numYarnActive, numYarnCompleted));
+ log.debug(String.format("requesting %d new containers (%d requested last), releasing %d", numRequestAdditional, numRequestedLast, destroyedReleasedIds.size()));
+
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ Resource resource = Records.newRecord(Resource.class);
+ resource.setMemory(256); // TODO make dynamic
+
+ ResourceRequest resourceRequest = Records.newRecord(ResourceRequest.class);
+ resourceRequest.setHostName("*");
+ resourceRequest.setNumContainers(numRequestAdditional);
+ resourceRequest.setPriority(priority);
+ resourceRequest.setCapability(resource);
+
+ AllocateRequest request = Records.newRecord(AllocateRequest.class);
+ request.setResponseId(responseId);
+ request.setApplicationAttemptId(appAtemptId);
+ request.addAsk(resourceRequest);
+ request.addAllReleases(destroyedReleasedIds);
+ request.addAllReleases(unneededReleasedIds);
+
+ responseId++;
+
+ AllocateResponse allocateResponse = null;
+ try {
+ allocateResponse = yarnClient.allocate(request);
+ } catch (YarnRemoteException e) {
+ // ignore
+ log.error("Error allocating containers", e);
+ return;
+ }
+
+ numRequestedLast = numRequestAdditional;
+
+ AMResponse response = allocateResponse.getAMResponse();
+
+ // newly added containers
+ for(Container container : response.getAllocatedContainers()) {
+ unassignedContainers.put(container.getId(), container);
+ }
+
+ log.info(String.format("%d new containers available, %d required", unassignedContainers.size(), numMetaAcquire));
+
+ Iterator<Container> itYarn = unassignedContainers.values().iterator();
+ Iterator<ContainerMetadata> itMeta = metadata.iterator();
+ while(itYarn.hasNext() && itMeta.hasNext()) {
+ ContainerMetadata meta = itMeta.next();
+
+ if(meta.yarnId >= 0)
+ continue;
+
+ Container containerYarn = itYarn.next();
+
+ log.debug(String.format("assigning yarn container '%s' to container node '%s'", containerYarn.getId(), meta.id));
+
+ String command = String.format(CONTAINER_COMMAND, meta.command,
+ appConfig.clusterAddress, appConfig.clusterName, appConfig.providerAddress, appConfig.providerName,
+ meta.id, "/tmp/" + meta.id, "/tmp/" + meta.id);
+ //ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+
+ ContainerLaunchContext context = Records.newRecord(ContainerLaunchContext.class);
+ context.setContainerId(containerYarn.getId());
+ context.setResource(containerYarn.getResource());
+ context.setEnvironment(Maps.<String, String>newHashMap());
+ context.setCommands(Collections.singletonList(command));
+ context.setLocalResources(Utils.getDummyResources());
+ try {
+ context.setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+ } catch (IOException e) {
+ log.error(String.format("failed setting up container '%s' user information", meta.id));
+ return;
+ }
+
+ log.debug(String.format("container '%s' executing command '%s'", meta.id, command));
+
+ StartContainerRequest startReq = Records.newRecord(StartContainerRequest.class);
+ startReq.setContainerLaunchContext(context);
+
+ try {
+ getContainerManager(containerYarn).startContainer(startReq);
+
+ } catch (YarnRemoteException e) {
+ log.error(String.format("Error starting container '%s'", meta.id), e);
+ return;
+ }
+
+ log.debug(String.format("container '%s' started, updating container node", meta.id));
+
+ metaService.update(new ContainerMetadata(meta, ContainerState.CONNECTING, containerYarn.getId().getId()));
+ yarn2meta.put(containerYarn.getId(), meta.id);
+
+ log.debug(String.format("removing '%s' from unassigned yarn containers and adding to active list", containerYarn.getId()));
+
+ itYarn.remove();
+ activeContainers.put(containerYarn.getId(), containerYarn);
+
+ }
+
+ for(ContainerStatus status : response.getCompletedContainersStatuses()) {
+ ContainerId id = status.getContainerId();
+
+ log.info(String.format("Container '%s' completed", id));
+
+ if(unassignedContainers.containsKey(id)) {
+ log.info(String.format("Unassigned container '%s' terminated, removing", id));
+ unassignedContainers.remove(id);
+ // TODO handle
+ }
+
+ if(activeContainers.containsKey(id)) {
+ log.info(String.format("Active container '%s' terminated, removing", id));
+ activeContainers.remove(id);
+
+ String metaId = yarn2meta.get(id);
+ ContainerMetadata meta = metaService.read(metaId);
+
+ log.debug(String.format("container '%s' finalized, updating container node", meta.id));
+
+ metaService.update(new ContainerMetadata(meta, ContainerState.FINALIZE));
+ }
+
+ completedContainers.put(id, status);
+ }
+
+ log.debug("yarn service update cycle complete");
+
+ } catch (Exception e) {
+ log.error("Error while executing yarn update cycle", e);
+ }
+ }
+
+ private List<ContainerId> createOverstockReleaseList(int numYarnRequired) {
+ List<ContainerId> unneededReleasedIds = new ArrayList<ContainerId>();
+ Iterator<Container> itUnassigned = unassignedContainers.values().iterator();
+ if(numYarnRequired < 0) {
+ for(int i=0; i<-numYarnRequired && itUnassigned.hasNext(); i++) {
+ Container container = itUnassigned.next();
+ unneededReleasedIds.add(container.getId());
+ log.debug(String.format("Container '%s' no longer required, removing", container.getId()));
+ itUnassigned.remove();
+ }
+ }
+ return unneededReleasedIds;
+ }
+
+ private List<ContainerId> createDestroyedReleaseList(
+ Collection<ContainerMetadata> metadata) {
+ List<ContainerId> releasedIds = new ArrayList<ContainerId>();
+ for(ContainerMetadata meta : metadata) {
+ if(meta.state == ContainerState.HALTED) {
+ ContainerId containerId = Records.newRecord(ContainerId.class);
+ containerId.setApplicationAttemptId(appAtemptId);
+ containerId.setId(meta.yarnId);
+ releasedIds.add(containerId);
+ log.debug(String.format("releasing container '%s'", containerId));
+ }
+ }
+ return releasedIds;
+ }
+
+ private int countAcquireMeta(Collection<ContainerMetadata> metadata) {
+ int numMetaAcquire = 0;
+ for(ContainerMetadata meta : metadata) {
+ if(meta.state == ContainerState.ACQUIRE) {
+ numMetaAcquire++;
+ }
+ }
+ return numMetaAcquire;
+ }
+
+ private int countActiveMeta(Collection<ContainerMetadata> metadata) {
+ int numMetaActive = 0;
+ for(ContainerMetadata meta : metadata) {
+ if(meta.state != ContainerState.ACQUIRE &&
+ meta.state != ContainerState.HALTED &&
+ meta.state != ContainerState.FINALIZE) {
+ numMetaActive++;
+ }
+ }
+ return numMetaActive;
+ }
+ }
+
+ private ContainerManager getContainerManager(Container container) {
+ YarnConfiguration yarnConf = new YarnConfiguration(yarnConfig);
+ YarnRPC rpc = YarnRPC.create(yarnConf);
+ NodeId nodeId = container.getNodeId();
+ String containerIpPort = String.format("%s:%d", nodeId.getHost(),
+ nodeId.getPort());
+ log.info("Connecting to ContainerManager at: " + containerIpPort);
+ InetSocketAddress addr = NetUtils.createSocketAddr(containerIpPort);
+ ContainerManager cm = (ContainerManager) rpc.getProxy(
+ ContainerManager.class, addr, yarnConfig);
+ return cm;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java
new file mode 100644
index 0000000..4314bdc
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnHelper.java
@@ -0,0 +1,5 @@
+package org.apache.helix.metamanager.yarn;
+
+public class YarnHelper {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java
new file mode 100644
index 0000000..a2aef0e
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnMaster.java
@@ -0,0 +1,134 @@
+package org.apache.helix.metamanager.yarn;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.log4j.Logger;
+
+public class YarnMaster extends Configured implements Tool {
+
+ static final Logger log = Logger.getLogger(YarnMaster.class);
+
+ AMRMProtocol resourceManager;
+ ApplicationAttemptId appAttemptId;
+
+ YarnContainerService service;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ log.trace("BEGIN YarnMaster.run()");
+
+ Configuration conf = getConf();
+
+ this.appAttemptId = getApplicationAttemptId();
+ log.info(String.format("Got application attempt id '%s'", appAttemptId.toString()));
+
+ log.debug("Getting resource manager");
+ this.resourceManager = getResourceManager(conf);
+
+ // register the AM with the RM
+ log.debug("Registering application master");
+ RegisterApplicationMasterRequest appMasterRequest =
+ Records.newRecord(RegisterApplicationMasterRequest.class);
+ appMasterRequest.setApplicationAttemptId(appAttemptId);
+ appMasterRequest.setHost("");
+ appMasterRequest.setRpcPort(0);
+ appMasterRequest.setTrackingUrl("");
+
+ resourceManager.registerApplicationMaster(appMasterRequest);
+
+ String clusterAddress = getEnv(YarnApplication.ENV_CLUSTER_ADDRESS);
+ String clusterName = getEnv(YarnApplication.ENV_CLUSTER_NAME);
+ String providerAddress = getEnv(YarnApplication.ENV_PROVIDER_ADDRESS);
+ String providerName = getEnv(YarnApplication.ENV_PROVIDER_NAME);
+ ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, providerAddress, providerName);
+
+ service = new YarnContainerService(resourceManager, conf, appAttemptId, appConfig);
+ service.startService();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+ service.stopService();
+
+ // finish application
+ log.debug("Sending finish request");
+ FinishApplicationMasterRequest finishReq =
+ Records.newRecord(FinishApplicationMasterRequest.class);
+
+ finishReq.setAppAttemptId(getApplicationAttemptId());
+ finishReq.setFinishApplicationStatus(FinalApplicationStatus.SUCCEEDED);
+
+ try { resourceManager.finishApplicationMaster(finishReq); } catch(Exception ignore) {}
+ }
+ }));
+
+ try { Thread.currentThread().join(); } catch(Exception ignore) {}
+
+ log.trace("END YarnMaster.run()");
+
+ return 0;
+ }
+
+ private AMRMProtocol getResourceManager(Configuration conf) {
+ // Connect to the Scheduler of the ResourceManager.
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ YarnRPC rpc = YarnRPC.create(yarnConf);
+ InetSocketAddress rmAddress =
+ NetUtils.createSocketAddr(yarnConf.get(
+ YarnConfiguration.RM_SCHEDULER_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS));
+ log.info("Connecting to ResourceManager at " + rmAddress);
+ AMRMProtocol resourceManager =
+ (AMRMProtocol) rpc.getProxy(AMRMProtocol.class, rmAddress, conf);
+ return resourceManager;
+ }
+
+ private ApplicationAttemptId getApplicationAttemptId() {
+ ContainerId containerId = ConverterUtils.toContainerId(getEnv(ApplicationConstants.AM_CONTAINER_ID_ENV));
+ ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+ return appAttemptID;
+ }
+
+ private String getEnv(String key) {
+ Map<String, String> envs = System.getenv();
+ String clusterName = envs.get(key);
+ if (clusterName == null) {
+ // container id should always be set in the env by the framework
+ throw new IllegalArgumentException(
+ String.format("%s not set in the environment", key));
+ }
+ return clusterName;
+ }
+
+ public static void main(String[] args) throws Exception {
+ log.trace("BEGIN YarnMaster.main()");
+
+ try {
+ int rc = ToolRunner.run(new Configuration(), new YarnMaster(), args);
+ System.exit(rc);
+ } catch (Exception e) {
+ System.err.println(e);
+ System.exit(1);
+ }
+
+ log.trace("END YarnMaster.main()");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java
new file mode 100644
index 0000000..7108d39
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/java/org/apache/helix/metamanager/yarn/YarnProcess.java
@@ -0,0 +1,171 @@
+package org.apache.helix.metamanager.yarn;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.metamanager.managed.ManagedFactory;
+import org.apache.helix.metamanager.yarn.ContainerMetadata.ContainerState;
+import org.apache.log4j.Logger;
+
+public class YarnProcess {
+ static final Logger log = Logger.getLogger(YarnProcess.class);
+
+ static final long CONTAINERSERVICE_INTERVAL = 1000;
+
+ final ApplicationConfig appConfig;
+ final String containerId;
+
+ HelixManager participantManager;
+
+ MetadataService metaService;
+ ScheduledExecutorService executor;
+
+
+ public YarnProcess(ApplicationConfig appConfig, String containerId) {
+ this.appConfig = appConfig;
+ this.containerId = containerId;
+ }
+
+ public void startService() {
+ log.info(String.format("start metadata service for '%s'", containerId));
+ metaService = new MetadataService(appConfig);
+ metaService.start();
+
+ executor = Executors.newSingleThreadScheduledExecutor();
+ executor.scheduleAtFixedRate(new ContainerService(), 0, CONTAINERSERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+ }
+
+ public void stopService() {
+ log.info(String.format("stop metadata service for '%s'", containerId));
+ if (metaService != null) {
+ metaService.stop();
+ metaService = null;
+ }
+
+ if(executor != null) {
+ executor.shutdown();
+ }
+ }
+
+ public boolean isRunning() {
+ if(executor == null)
+ return false;
+ return !executor.isTerminated();
+ }
+
+ public void startParticipant() throws Exception {
+ log.info("STARTING " + containerId);
+ participantManager = HelixManagerFactory.getZKHelixManager(appConfig.clusterName,
+ containerId, InstanceType.PARTICIPANT, appConfig.clusterAddress);
+ participantManager.getStateMachineEngine().registerStateModelFactory(
+ "MasterSlave", new ManagedFactory());
+ participantManager.connect();
+ log.info("STARTED " + containerId);
+ }
+
+ public void stopParticipant() {
+ if (participantManager != null) {
+ participantManager.disconnect();
+ participantManager = null;
+ }
+ }
+
+ public void updateContainerStatus() {
+ log.info("updating container status");
+ try {
+ ContainerMetadata meta = metaService.read(containerId);
+
+ if(meta.state == ContainerState.CONNECTING) {
+ log.info("container connecting, going to active");
+ try {
+ startParticipant();
+ metaService.update(new ContainerMetadata(meta, ContainerState.ACTIVE));
+ } catch (Exception e) {
+ log.error("Failed to start participant, going to failed", e);
+ stopParticipant();
+ metaService.update(new ContainerMetadata(meta, ContainerState.FAILED));
+ }
+ }
+
+ if(meta.state == ContainerState.ACTIVE) {
+ // do something
+ // and go to failed on error
+ }
+
+ if(meta.state == ContainerState.TEARDOWN) {
+ log.info("container teardown, going to halted");
+ stopParticipant();
+ metaService.update(new ContainerMetadata(meta, ContainerState.HALTED));
+ stopService();
+ }
+
+ } catch(Exception e) {
+ log.warn(String.format("Container '%s' does not exist, stopping service", containerId));
+ stopService();
+ }
+ }
+
+ class ContainerService implements Runnable {
+ @Override
+ public void run() {
+ updateContainerStatus();
+ }
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ log.trace("BEGIN YarnProcess.main()");
+
+ final String clusterAddress = args[0];
+ final String clusterName = args[1];
+ final String providerAddress = args[2];
+ final String providerName = args[3];
+ final String containerId = args[4];
+
+ final ApplicationConfig appConfig = new ApplicationConfig(clusterAddress, clusterName, providerAddress, providerName);
+
+ final YarnProcess yarnProcess = new YarnProcess(appConfig, containerId);
+
+ yarnProcess.startService();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ @Override
+ public void run() {
+ yarnProcess.stopService();
+ }
+ }));
+
+ while(yarnProcess.isRunning()) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ log.trace("END YarnProcess.main()");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2local.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2local.properties b/recipes/meta-cluster-manager/src/main/resources/2by2local.properties
new file mode 100644
index 0000000..ac7968a
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2local.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=local
+
+meta.provider.type=local
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties b/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties
new file mode 100644
index 0000000..3971375
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2localMixedModels.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=local
+
+meta.provider.type=local
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyOnlineOfflineProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=OnlineOffline
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties b/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties
new file mode 100644
index 0000000..a26f250
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2shell.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=shell
+
+meta.provider.type=shell
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties b/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties
new file mode 100644
index 0000000..6afd2c6
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2yarn.properties
@@ -0,0 +1,58 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=rm:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=yarn
+meta.status.metadata=rm:2199
+
+meta.provider.type=yarn
+meta.provider.name=provider_0
+meta.provider.address=rm:2199
+meta.provider.cluster=managed
+meta.provider.metadata=rm:2199
+meta.provider.resourcemananger=rm:8032
+meta.provider.scheduler=rm:8030
+meta.provider.user=yarn
+meta.provider.hdfs=hdfs://rm:9000/
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=rm:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties b/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties
new file mode 100644
index 0000000..66f3637
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2by2yarnZookeeper.properties
@@ -0,0 +1,58 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=rm:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=yarn
+meta.status.metadata=rm:2199
+
+meta.provider.type=yarn
+meta.provider.name=provider_0
+meta.provider.address=rm:2199
+meta.provider.cluster=managed
+meta.provider.metadata=rm:2199
+meta.provider.resourcemananger=rm:8032
+meta.provider.scheduler=rm:8030
+meta.provider.user=yarn
+meta.provider.hdfs=hdfs://rm:9000/
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.impl.container.ZookeeperMasterSlaveProcess
+meta.provider.container.database.address=rm:2199
+meta.provider.container.database.root=mydatabase
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.impl.container.ZookeeperMasterSlaveProcess
+meta.provider.container.webserver.address=rm:2199
+meta.provider.container.webserver.root=mywebserver
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=rm:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties b/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties
new file mode 100644
index 0000000..b719620
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/2meta2managed.properties
@@ -0,0 +1,52 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Meta Cluster
+#
+meta.cluster=meta
+meta.managed=managed
+meta.address=localhost:2199
+
+meta.target.type=static
+meta.target.database=3
+meta.target.webserver=5
+
+meta.status.type=local
+
+meta.provider.type=local
+meta.provider.name=provider_0
+meta.provider.address=localhost:2199
+meta.provider.cluster=managed
+
+meta.provider.containers=database,webserver
+
+meta.provider.container.database.class=org.apache.helix.metamanager.container.impl.DummyProcess
+meta.provider.container.database.prop1=foo
+meta.provider.container.database.prop2=bar
+
+meta.provider.container.webserver.class=org.apache.helix.metamanager.container.impl.DummyProcess
+meta.provider.container.webserver.prop1=foo
+meta.provider.container.webserver.prop2=bar
+
+#
+# Managed Cluster
+#
+managed.cluster=managed
+managed.address=localhost:2199
+
+managed.resources=dbprod,wsprod
+
+managed.resource.dbprod.container=database
+managed.resource.dbprod.model=MasterSlave
+managed.resource.dbprod.partitions=8
+managed.resource.dbprod.replica=3
+
+managed.resource.wsprod.container=webserver
+managed.resource.wsprod.model=MasterSlave
+managed.resource.wsprod.partitions=15
+managed.resource.wsprod.replica=1
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/e38aa54b/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties
----------------------------------------------------------------------
diff --git a/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties b/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties
new file mode 100644
index 0000000..4eb07bd
--- /dev/null
+++ b/recipes/meta-cluster-manager/src/main/resources/Boot2By2Local.properties
@@ -0,0 +1,87 @@
+#
+# Zookeeper (optional)
+#
+zookeeper.datadir=/tmp/meta/zk/data
+zookeeper.logdir=/tmp/meta/zk/log
+zookeeper.port=2199
+
+#
+# Cluster
+#
+cluster.name=cluster
+cluster.address=localhost:2199
+
+#
+# Resource
+#
+resource.0.name=wsprod
+resource.0.cluster=cluster
+resource.0.address=localhost:2199
+resource.0.container=webserver
+resource.0.model=MasterSlave
+resource.0.partitions=15
+resource.0.replica=1
+
+resource.1.name=dbprod
+resource.1.cluster=cluster
+resource.1.address=localhost:2199
+resource.1.container=database
+resource.1.model=MasterSlave
+resource.1.partitions=8
+resource.1.replica=3
+
+#
+# Controller
+#
+controller.name=controller
+controller.cluster=cluster
+controller.address=localhost:2199
+controller.autorefresh=5000
+
+#
+# Metacluster
+#
+metacluster.name=meta
+metacluster.address=localhost:2199
+metacluster.managedcluster=cluster
+metacluster.managedaddress=localhost:2199
+
+#
+# Metaresource
+#
+metaresource.0.name=webserver
+metaresource.0.metacluster=meta
+metaresource.0.metaaddress=localhost:2199
+metaresource.0.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+
+metaresource.1.name=database
+metaresource.1.metacluster=meta
+metaresource.1.metaaddress=localhost:2199
+metaresource.1.class=org.apache.helix.metamanager.impl.container.DummyMasterSlaveProcess
+
+#
+# Metaprovider
+#
+metaprovider.0.name=provider0
+metaprovider.0.metacluster=meta
+metaprovider.0.metaaddress=localhost:2199
+metaprovider.0.class=org.apache.helix.metamanager.impl.local.LocalContainerProviderProcess
+
+metaprovider.1.name=provider1
+metaprovider.1.metacluster=meta
+metaprovider.1.metaaddress=localhost:2199
+metaprovider.1.class=org.apache.helix.metamanager.impl.local.LocalContainerProviderProcess
+
+#
+# Metacontroller
+#
+metacontroller.name=metacontroller
+metacontroller.metacluster=meta
+metacontroller.metaaddress=localhost:2199
+metacontroller.autorefresh=5000
+
+metacontroller.status.class=org.apache.helix.metamanager.impl.local.LocalStatusProvider
+
+metacontroller.target.class=org.apache.helix.metamanager.impl.StaticTargetProvider
+metacontroller.target.webserver=5
+metacontroller.target.database=3