You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:31 UTC

[48/50] [abbrv] Merge remote-tracking branch 'origin/helix-provisioning'

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
index 0000000,8154996..f0e3d37
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
@@@ -1,0 -1,98 +1,116 @@@
+ package org.apache.helix.provisioning.tools;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.UUID;
+ 
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.cli.OptionGroup;
+ import org.apache.commons.cli.Options;
 -import org.apache.commons.cli.ParseException;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.accessor.ParticipantAccessor;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.MessageId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.log4j.Logger;
+ 
+ /**
 - * 
++ *
+  *
+  */
+ public class ContainerAdmin {
+ 
+   private static Logger LOG = Logger.getLogger(ContainerAdmin.class);
+   private static String stopContainer = "stopContainer";
+   private HelixConnection _connection;
+ 
+   public ContainerAdmin(String zkAddress) {
+     _connection = new ZkHelixConnection(zkAddress);
+     _connection.connect();
+   }
+ 
+   public void stopContainer(String appName, String participantName) throws Exception {
+     ClusterId clusterId = ClusterId.from(appName);
+     ParticipantAccessor participantAccessor = _connection.createParticipantAccessor(clusterId);
+     ParticipantId participantId = ParticipantId.from(participantName);
+     Participant participant = participantAccessor.readParticipant(participantId);
+     if (participant != null && participant.isAlive()) {
+       Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
+       message.setTgtName(participant.getId().toString());
+       message.setTgtSessionId(participant.getRunningInstance().getSessionId());
+       message.setMsgId(message.getId());
+       Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+       msgMap.put(MessageId.from(message.getId()), message);
+       participantAccessor.insertMessagesToParticipant(participantId, msgMap);
+       do {
+         participant = participantAccessor.readParticipant(participantId);
+         Thread.sleep(1000);
+         LOG.info("Waiting for container:" + participantName + " to shutdown");
 -      } while (participant!=null && participant.isAlive());
++      } while (participant != null && participant.isAlive());
+     }
 -    
++
+   }
+ 
+   @SuppressWarnings("static-access")
+   public static void main(String[] args) throws Exception {
+     Option zkServerOption =
+         OptionBuilder.withLongOpt("zookeeperAddress").withDescription("Provide zookeeper address")
+             .create();
+     zkServerOption.setArgs(1);
+     zkServerOption.setRequired(true);
+     zkServerOption.setArgName("zookeeperAddress(Required)");
+ 
+     OptionGroup group = new OptionGroup();
+     group.setRequired(true);
+ 
+     // update container count per service
+     Option stopContainerOption =
+         OptionBuilder.withLongOpt(stopContainer).withDescription("appName participantName")
+             .create();
+     stopContainerOption.setArgs(2);
+     stopContainerOption.setRequired(false);
+     stopContainerOption.setArgName("appName participantName");
+ 
+     group.addOption(stopContainerOption);
+ 
+     Options options = new Options();
+     options.addOption(zkServerOption);
+     options.addOptionGroup(group);
+     CommandLine cliParser = new GnuParser().parse(options, args);
+ 
+     String zkAddress = cliParser.getOptionValue("zookeeperAddress");
+     ContainerAdmin admin = new ContainerAdmin(zkAddress);
+ 
+     if (cliParser.hasOption(stopContainer)) {
+       String appName = cliParser.getOptionValues(stopContainer)[0];
+       String participantName = cliParser.getOptionValues(stopContainer)[1];
+       admin.stopContainer(appName, participantName);
+     }
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
index 0000000,f3cce42..f6713d1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
@@@ -1,0 -1,87 +1,106 @@@
+ package org.apache.helix.provisioning.tools;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.cli.OptionGroup;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.cli.ParseException;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.accessor.ResourceAccessor;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.provisioning.yarn.YarnProvisionerConfig;
+ import org.apache.log4j.Logger;
+ /**
+  * Update the provisioner config
+  */
+ public class UpdateProvisionerConfig {
+   private static Logger LOG = Logger.getLogger(UpdateProvisionerConfig.class);
+   private static String updateContainerCount = "updateContainerCount";
+   private HelixConnection _connection;
+ 
+   public UpdateProvisionerConfig(String zkAddress) {
+     _connection = new ZkHelixConnection(zkAddress);
+     _connection.connect();
+   }
+ 
+   public void setNumContainers(String appName, String serviceName, int numContainers) {
+     ResourceId resourceId = ResourceId.from(serviceName);
+ 
+     ResourceAccessor resourceAccessor = _connection.createResourceAccessor(ClusterId.from(appName));
+     Resource resource = resourceAccessor.readResource(resourceId);
+     LOG.info("Current provisioner config:"+ resource.getProvisionerConfig());
+ 
+     ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId);
+     YarnProvisionerConfig config = new YarnProvisionerConfig(resourceId);
+     config.setNumContainers(numContainers);
+     delta.setProvisionerConfig(config);
+     ResourceConfig updatedResourceConfig = resourceAccessor.updateResource(resourceId, delta);
+     LOG.info("Update provisioner config:"+ updatedResourceConfig.getProvisionerConfig());
+ 
+   }
+ 
+   @SuppressWarnings("static-access")
+   public static void main(String[] args) throws ParseException {
+     Option zkServerOption =
+         OptionBuilder.withLongOpt("zookeeperAddress").withDescription("Provide zookeeper address")
+             .create();
+     zkServerOption.setArgs(1);
+     zkServerOption.setRequired(true);
+     zkServerOption.setArgName("zookeeperAddress(Required)");
+ 
+     OptionGroup group = new OptionGroup();
+     group.setRequired(true);
+ 
+     // update container count per service
+     Option updateContainerCountOption =
+         OptionBuilder.withLongOpt(updateContainerCount)
+             .withDescription("appName serviceName numContainers").create();
+     updateContainerCountOption.setArgs(3);
+     updateContainerCountOption.setRequired(false);
+     updateContainerCountOption.setArgName("appName serviceName numContainers");
+ 
+     group.addOption(updateContainerCountOption);
+ 
+     Options options = new Options();
+     options.addOption(zkServerOption);
+     options.addOptionGroup(group);
+     CommandLine cliParser = new GnuParser().parse(options, args);
+ 
+     String zkAddress = cliParser.getOptionValue("zookeeperAddress");
+     UpdateProvisionerConfig updater = new UpdateProvisionerConfig(zkAddress);
+ 
+     if (cliParser.hasOption(updateContainerCount)) {
+       String appName = cliParser.getOptionValues(updateContainerCount)[0];
+       String serviceName = cliParser.getOptionValues(updateContainerCount)[1];
+       int numContainers = Integer.parseInt(
+         cliParser.getOptionValues(updateContainerCount)[2]);
+       updater.setNumContainers(appName, serviceName, numContainers);
+     }
+ 
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 0000000,2db4afb..2be7062
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@@ -1,0 -1,561 +1,580 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Vector;
+ 
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.DataOutputBuffer;
+ import org.apache.hadoop.security.Credentials;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.yarn.api.ApplicationConstants;
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+ import org.apache.hadoop.yarn.api.records.ApplicationId;
+ import org.apache.hadoop.yarn.api.records.ApplicationReport;
+ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.api.records.LocalResourceType;
+ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+ import org.apache.hadoop.yarn.api.records.Priority;
+ import org.apache.hadoop.yarn.api.records.Resource;
+ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+ import org.apache.hadoop.yarn.client.api.YarnClient;
+ import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+ import org.apache.hadoop.yarn.conf.YarnConfiguration;
+ import org.apache.hadoop.yarn.util.ConverterUtils;
+ import org.apache.hadoop.yarn.util.Records;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.provisioning.ApplicationSpec;
+ import org.apache.helix.provisioning.ApplicationSpecFactory;
+ import org.apache.helix.provisioning.HelixYarnUtil;
+ import org.apache.helix.provisioning.TaskConfig;
+ 
+ /**
+  * Main class to launch the job.
+  * Gets the yaml file as the input.
+  * Converts yaml file into ApplicationSpec.
+  */
+ public class AppLauncher {
+ 
+   private static final Log LOG = LogFactory.getLog(AppLauncher.class);
+ 
+   private ApplicationSpec _applicationSpec;
+   private YarnClient yarnClient;
+   private ApplicationSpecFactory _applicationSpecFactory;
+   private File _yamlConfigFile;
+ 
+   private YarnConfiguration _conf;
+ 
+   private File appMasterArchive;
+ 
+   private ApplicationId _appId;
+ 
+   private AppMasterConfig _appMasterConfig;
+ 
+   public AppLauncher(ApplicationSpecFactory applicationSpecFactory, File yamlConfigFile)
+       throws Exception {
+     _applicationSpecFactory = applicationSpecFactory;
+     _yamlConfigFile = yamlConfigFile;
+     init();
+   }
+ 
+   private void init() throws Exception {
+     _applicationSpec = _applicationSpecFactory.fromYaml(new FileInputStream(_yamlConfigFile));
+     _appMasterConfig = new AppMasterConfig();
+     appMasterArchive = new File(_applicationSpec.getAppMasterPackage());
+     yarnClient = YarnClient.createYarnClient();
+     _conf = new YarnConfiguration();
+     yarnClient.init(_conf);
+   }
+ 
+   public ApplicationSpec getApplicationSpec() {
+     return _applicationSpec;
+   }
+ 
+   public boolean launch() throws Exception {
+     LOG.info("Running Client");
+     yarnClient.start();
+ 
+     // Get a new application id
+     YarnClientApplication app = yarnClient.createApplication();
+     GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+     // TODO get min/max resource capabilities from RM and change memory ask if needed
+     // If we do not have min/max, we may not be able to correctly request
+     // the required resources from the RM for the app master
+     // Memory ask has to be a multiple of min and less than max.
+     // Dump out information about cluster capability as seen by the resource manager
+     int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+     LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+ 
+     // set the application name
+     ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+     _appId = appContext.getApplicationId();
+     _appMasterConfig.setAppId(_appId.getId());
+     String appName = _applicationSpec.getAppName();
+     _appMasterConfig.setAppName(appName);
+     _appMasterConfig.setApplicationSpecFactory(_applicationSpecFactory.getClass()
+         .getCanonicalName());
+     appContext.setApplicationName(appName);
+ 
+     // Set up the container launch context for the application master
+     ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+ 
+     LOG.info("Copy Application archive file from local filesystem and add to local environment");
+     // Copy the application master jar to the filesystem
+     // Create a local resource to point to the destination jar path
+     FileSystem fs = FileSystem.get(_conf);
+ 
+     // get packages for each component packages
+     Map<String, URI> packages = new HashMap<String, URI>();
+     packages
+         .put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterArchive.toURI());
+     packages.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), _yamlConfigFile.toURI());
+     for (String serviceName : _applicationSpec.getServices()) {
+       packages.put(serviceName, _applicationSpec.getServicePackage(serviceName));
+     }
+     Map<String, Path> hdfsDest = new HashMap<String, Path>();
+     Map<String, String> classpathMap = new HashMap<String, String>();
+     for (String name : packages.keySet()) {
+       URI uri = packages.get(name);
+       Path dst = copyToHDFS(fs, name, uri);
+       hdfsDest.put(name, dst);
+       String classpath = generateClasspathAfterExtraction(name, new File(uri));
+       classpathMap.put(name, classpath);
+       _appMasterConfig.setClasspath(name, classpath);
+       String serviceMainClass = _applicationSpec.getServiceMainClass(name);
+       if (serviceMainClass != null) {
+         _appMasterConfig.setMainClass(name, serviceMainClass);
+       }
+     }
+ 
+     // Get YAML files describing all workflows to immediately start
+     Map<String, URI> workflowFiles = new HashMap<String, URI>();
+     List<TaskConfig> taskConfigs = _applicationSpec.getTaskConfigs();
+     if (taskConfigs != null) {
+       for (TaskConfig taskConfig : taskConfigs) {
+         URI configUri = taskConfig.getYamlURI();
+         if (taskConfig.name != null && configUri != null) {
+           workflowFiles.put(taskConfig.name, taskConfig.getYamlURI());
+         }
+       }
+     }
+ 
+     // set local resources for the application master
+     // local files or archives as needed
+     // In this scenario, the jar file for the application master is part of the local resources
+     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+     LocalResource appMasterPkg =
+         setupLocalResource(fs,
+             hdfsDest.get(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString()));
+     LocalResource appSpecFile =
+         setupLocalResource(fs,
+             hdfsDest.get(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString()));
+     localResources.put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterPkg);
+     localResources.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), appSpecFile);
+     for (String name : workflowFiles.keySet()) {
+       URI uri = workflowFiles.get(name);
+       Path dst = copyToHDFS(fs, name, uri);
+       LocalResource taskLocalResource = setupLocalResource(fs, dst);
+       localResources.put(AppMasterConfig.AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + name,
+           taskLocalResource);
+     }
+ 
+     // Set local resource info into app master container launch context
+     amContainer.setLocalResources(localResources);
+ 
+     // Set the necessary security tokens as needed
+     // amContainer.setContainerTokens(containerToken);
+ 
+     // Add AppMaster.jar location to classpath
+     // At some point we should not be required to add
+     // the hadoop specific classpaths to the env.
+     // It should be provided out of the box.
+     // For now setting all required classpaths including
+     // the classpath to "." for the application jar
+     StringBuilder classPathEnv =
+         new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*")
+             .append(File.pathSeparatorChar);
+     classPathEnv.append(classpathMap.get(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString()));
+     for (String c : _conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+         YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+       classPathEnv.append(File.pathSeparatorChar);
+       classPathEnv.append(c.trim());
+     }
+     classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
+ 
+     // add the runtime classpath needed for tests to work
+     if (_conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+       classPathEnv.append(':');
+       classPathEnv.append(System.getProperty("java.class.path"));
+     }
+     LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n");
+     // Set the env variables to be setup in the env where the application master will be run
+     Map<String, String> env = new HashMap<String, String>(_appMasterConfig.getEnv());
+     env.put("CLASSPATH", classPathEnv.toString());
+ 
+     amContainer.setEnvironment(env);
+ 
+     // Set the necessary command to execute the application master
+     Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+ 
+     // Set java executable command
+     LOG.info("Setting up app master launch command");
+     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+     int amMemory = 1024;
+     // Set Xmx based on am memory size
+     vargs.add("-Xmx" + amMemory + "m");
+     // Set class name
+     vargs.add(AppMasterLauncher.class.getCanonicalName());
+     // Set params for Application Master
+     // vargs.add("--num_containers " + String.valueOf(numContainers));
+ 
+     vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+     vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+ 
+     // Get final commmand
+     StringBuilder command = new StringBuilder();
+     for (CharSequence str : vargs) {
+       command.append(str).append(" ");
+     }
+ 
+     LOG.info("Completed setting up app master command " + command.toString());
+     List<String> commands = new ArrayList<String>();
+     commands.add(command.toString());
+     amContainer.setCommands(commands);
+ 
+     // Set up resource type requirements
+     // For now, only memory is supported so we set memory requirements
+     Resource capability = Records.newRecord(Resource.class);
+     capability.setMemory(amMemory);
+     appContext.setResource(capability);
+ 
+     // Service data is a binary blob that can be passed to the application
+     // Not needed in this scenario
+     // amContainer.setServiceData(serviceData);
+ 
+     // Setup security tokens
+     if (UserGroupInformation.isSecurityEnabled()) {
+       Credentials credentials = new Credentials();
+       String tokenRenewer = _conf.get(YarnConfiguration.RM_PRINCIPAL);
+       if (tokenRenewer == null || tokenRenewer.length() == 0) {
+         throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
+       }
+ 
+       // For now, only getting tokens for the default file-system.
+       final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);
+       if (tokens != null) {
+         for (Token<?> token : tokens) {
+           LOG.info("Got dt for " + fs.getUri() + "; " + token);
+         }
+       }
+       DataOutputBuffer dob = new DataOutputBuffer();
+       credentials.writeTokenStorageToStream(dob);
+       ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+       amContainer.setTokens(fsTokens);
+     }
+ 
+     appContext.setAMContainerSpec(amContainer);
+ 
+     // Set the priority for the application master
+     Priority pri = Records.newRecord(Priority.class);
+     int amPriority = 0;
+     // TODO - what is the range for priority? how to decide?
+     pri.setPriority(amPriority);
+     appContext.setPriority(pri);
+ 
+     String amQueue = "default";
+     // Set the queue to which this application is to be submitted in the RM
+     appContext.setQueue(amQueue);
+ 
+     LOG.info("Submitting application to YARN Resource Manager");
+ 
+     ApplicationId applicationId = yarnClient.submitApplication(appContext);
+ 
+     LOG.info("Submitted application with applicationId:" + applicationId);
+ 
+     return true;
+   }
+ 
+   /**
+    * Generates the classpath after the archive file gets extracted under 'serviceName' folder
+    * @param serviceName
+    * @param archiveFile
+    * @return
+    */
+   private String generateClasspathAfterExtraction(String serviceName, File archiveFile) {
+     if (!isArchive(archiveFile.getAbsolutePath())) {
+       return "./";
+     }
+     StringBuilder classpath = new StringBuilder();
+     // put the jar files under the archive in the classpath
+     try {
+       final InputStream is = new FileInputStream(archiveFile);
+       final TarArchiveInputStream debInputStream =
+           (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is);
+       TarArchiveEntry entry = null;
+       while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
+         if (entry.isFile()) {
+           classpath.append(File.pathSeparatorChar);
+           classpath.append("./" + serviceName + "/" + entry.getName());
+         }
+       }
+       debInputStream.close();
+ 
+     } catch (Exception e) {
+       LOG.error("Unable to read archive file:" + archiveFile, e);
+     }
+     return classpath.toString();
+   }
+ 
+   private Path copyToHDFS(FileSystem fs, String name, URI uri) throws Exception {
+     // will throw exception if the file name is without extension
+     String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+     String pathSuffix =
+         _applicationSpec.getAppName() + "/" + _appId.getId() + "/" + name + "." + extension;
+     Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+     Path src = new Path(uri);
+     fs.copyFromLocalFile(false, true, src, dst);
+     return dst;
+   }
+ 
+   private LocalResource setupLocalResource(FileSystem fs, Path dst) throws Exception {
+     URI uri = dst.toUri();
+     String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+     FileStatus destStatus = fs.getFileStatus(dst);
+     LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+     // Set the type of resource - file or archive
+     // archives are untarred at destination
+     // we don't need the jar file to be untarred for now
+     if (isArchive(extension)) {
+       amJarRsrc.setType(LocalResourceType.ARCHIVE);
+     } else {
+       amJarRsrc.setType(LocalResourceType.FILE);
+     }
+     // Set visibility of the resource
+     // Setting to most private option
+     amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+     // Set the resource to be copied over
+     amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+     // Set timestamp and length of file so that the framework
+     // can do basic sanity checks for the local resource
+     // after it has been copied over to ensure it is the same
+     // resource the client intended to use with the application
+     amJarRsrc.setTimestamp(destStatus.getModificationTime());
+     amJarRsrc.setSize(destStatus.getLen());
+     return amJarRsrc;
+   }
+ 
+   private boolean isArchive(String path) {
+     return path.endsWith("tar") || path.endsWith("gz") || path.endsWith("tar.gz")
+         || path.endsWith("zip");
+   }
+ 
+   public HelixConnection pollForConnection() {
+     String prevReport = "";
+     HelixConnection connection = null;
+ 
+     while (true) {
+       try {
+         // Get application report for the appId we are interested in
+         ApplicationReport report = yarnClient.getApplicationReport(_appId);
+ 
+         String reportMessage = generateReport(report);
+         if (!reportMessage.equals(prevReport)) {
+           LOG.info(reportMessage);
+         }
+         YarnApplicationState state = report.getYarnApplicationState();
+         if (YarnApplicationState.RUNNING == state) {
+           if (connection == null) {
+             String hostName = null;
+             int ind = report.getHost().indexOf('/');
+             if (ind > -1) {
+               hostName = report.getHost().substring(ind + 1);
+             } else {
+               hostName = report.getHost();
+             }
+             connection = new ZkHelixConnection(hostName + ":2181");
+ 
+             try {
+               connection.connect();
+             } catch (Exception e) {
+               LOG.warn("AppMaster started but not yet initialized");
+               connection = null;
+             }
+           }
+           if (connection.isConnected()) {
+             return connection;
+           }
+         }
+         prevReport = reportMessage;
+         Thread.sleep(10000);
+       } catch (Exception e) {
+         LOG.error("Exception while getting info ", e);
+         break;
+       }
+     }
+     return null;
+   }
+ 
+   public ApplicationReport getApplicationReport() {
+     try {
+       return yarnClient.getApplicationReport(_appId);
+     } catch (Exception e) {
+       return null;
+     }
+   }
+ 
+   /**
+    * @return true if successfully completed, it will print status every X seconds
+    */
+   public boolean waitUntilDone() {
+     String prevReport = "";
+     HelixConnection connection = null;
+ 
+     while (true) {
+       try {
+         // Get application report for the appId we are interested in
+         ApplicationReport report = yarnClient.getApplicationReport(_appId);
+ 
+         String reportMessage = generateReport(report);
+         if (!reportMessage.equals(prevReport)) {
+           LOG.info(reportMessage);
+         }
+         YarnApplicationState state = report.getYarnApplicationState();
+         FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+         if (YarnApplicationState.FINISHED == state) {
+           if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+             LOG.info("Application has completed successfully. Breaking monitoring loop");
+             return true;
+           } else {
+             LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+                 + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+             return false;
+           }
+         } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+           LOG.info("Application did not finish." + " YarnState=" + state.toString()
+               + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+           return false;
+         }
+         if (YarnApplicationState.RUNNING == state) {
+           if (connection == null) {
+             String hostName = null;
+             int ind = report.getHost().indexOf('/');
+             if (ind > -1) {
+               hostName = report.getHost().substring(ind + 1);
+             } else {
+               hostName = report.getHost();
+             }
+             connection = new ZkHelixConnection(hostName + ":2181");
+ 
+             try {
+               connection.connect();
+             } catch (Exception e) {
+               LOG.warn("AppMaster started but not yet initialized");
+               connection = null;
+             }
+           }
+           if (connection.isConnected()) {
+             AppStatusReportGenerator generator = new AppStatusReportGenerator();
+             ClusterId clusterId = ClusterId.from(_applicationSpec.getAppName());
+             String generateReport = generator.generateReport(connection, clusterId);
+             LOG.info(generateReport);
+           }
+         }
+         prevReport = reportMessage;
+         Thread.sleep(10000);
+       } catch (Exception e) {
+         LOG.error("Exception while getting info ");
+         break;
+       }
+     }
+     return true;
+   }
+ 
+   /**
+    * TODO: kill the app only in dev mode. In prod, its ok for the app to continue running if the
+    * launcher dies after launching
+    */
+ 
+   private String generateReport(ApplicationReport report) {
+     return "Got application report from ASM for" + ", appId=" + _appId.getId()
+         + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics="
+         + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue="
+         + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime="
+         + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString()
+         + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+         + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser();
+   }
+ 
+   public void cleanup() {
+     LOG.info("Cleaning up");
+     try {
+       ApplicationReport applicationReport = yarnClient.getApplicationReport(_appId);
+       LOG.info("Killing application:" + _appId + " \n Application report"
+           + generateReport(applicationReport));
+     } catch (Exception e) {
+       e.printStackTrace();
+     }
+   }
+ 
+   /**
+    * Launches the application on a YARN cluster. Once launched, it will display (periodically) the
+    * status of the containers in the application.
+    * @param args app_spec_provider and app_config_spec
+    * @throws Exception
+    */
+   public static void main(String[] args) throws Exception {
+ 
+     Options opts = new Options();
+     opts.addOption(new Option("app_spec_provider", true,
+         "Application Spec Factory Class that will parse the app_config_spec file"));
+     opts.addOption(new Option("app_config_spec", true,
+         "YAML config file that provides the app specifications"));
+     CommandLine cliParser = new GnuParser().parse(opts, args);
+     String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
+     String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
+ 
+     ApplicationSpecFactory applicationSpecFactory =
+         HelixYarnUtil.createInstance(appSpecFactoryClass);
+     File yamlConfigFile = new File(yamlConfigFileName);
+     if (!yamlConfigFile.exists()) {
+       throw new IllegalArgumentException("YAML app_config_spec file: '" + yamlConfigFileName
+           + "' does not exist");
+     }
+     final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
+     launcher.launch();
+     Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+ 
+       @Override
+       public void run() {
+         launcher.cleanup();
+       }
+     }));
+     launcher.waitUntilDone();
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
index 0000000,38a0dd1..d0952c1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
@@@ -1,0 -1,111 +1,130 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.HashMap;
+ import java.util.Map;
+ 
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+ import org.apache.log4j.Logger;
+ 
+ /**
+  * Convenient method to pass information to containers
+  * The methods simply sets up environment variables
+  */
+ public class AppMasterConfig {
+   private static Logger LOG = Logger.getLogger(AppMasterConfig.class);
+   private Map<String, String> _envs;
+ 
+   public enum AppEnvironment {
+     APP_MASTER_PKG("APP_MASTER_PKG"),
+     APP_SPEC_FILE("APP_SPEC_FILE"),
+     APP_NAME("APP_NAME"),
+     APP_ID("APP_ID"),
+     APP_SPEC_FACTORY("APP_SPEC_FACTORY"),
+     TASK_CONFIG_FILE("TASK_CONFIG_FILE");
+     String _name;
+ 
+     private AppEnvironment(String name) {
+       _name = name;
+     }
+ 
+     public String toString() {
+       return _name;
+     }
+   }
+ 
+   public AppMasterConfig() {
+     _envs = new HashMap<String, String>();
+   }
+ 
+   private String get(String key) {
+     String value = (_envs.containsKey(key)) ? _envs.get(key) : System.getenv().get(key);
+     LOG.info("Returning value:" + value + " for key:'" + key + "'");
+ 
+     return value;
+   }
+ 
+   public void setAppId(int id) {
+     _envs.put(AppEnvironment.APP_ID.toString(), "" + id);
+   }
+ 
+   public String getAppName() {
+     return get(AppEnvironment.APP_NAME.toString());
+   }
+ 
+   public int getAppId() {
+     return Integer.parseInt(get(AppEnvironment.APP_ID.toString()));
+   }
+ 
+   public String getClassPath(String serviceName) {
+     return get(serviceName + "_classpath");
+   }
+ 
+   public String getMainClass(String serviceName) {
+     return get(serviceName + "_mainClass");
+   }
+ 
+   public String getZKAddress() {
+     return get(Environment.NM_HOST.name()) + ":2181";
+   }
+ 
+   public String getContainerId() {
+     return get(Environment.CONTAINER_ID.name());
+   }
+ 
+   public Map<String, String> getEnv() {
+     return _envs;
+   }
+ 
+   public void setAppName(String appName) {
+     _envs.put(AppEnvironment.APP_NAME.toString(), appName);
+ 
+   }
+ 
+   public void setClasspath(String serviceName, String classpath) {
+     _envs.put(serviceName + "_classpath", classpath);
+   }
+ 
+   public void setTaskConfigFile(String configName, String path) {
+     _envs.put(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName, path);
+   }
+ 
+   public String getTaskConfigFile(String configName) {
+     return get(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName);
+   }
+ 
+   public String getApplicationSpecConfigFile() {
+     return get(AppEnvironment.APP_SPEC_FILE.toString());
+   }
+ 
+   public String getApplicationSpecFactory() {
+     return get(AppEnvironment.APP_SPEC_FACTORY.toString());
+   }
+ 
+   public void setApplicationSpecFactory(String className) {
+     _envs.put(AppEnvironment.APP_SPEC_FACTORY.toString(), className);
+ 
+   }
+ 
+   public void setMainClass(String serviceName, String serviceMainClass) {
+     _envs.put(serviceName + "_mainClass", serviceMainClass);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 0000000,e7a0f61..31ef05c
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@@ -1,0 -1,194 +1,213 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.util.List;
+ import java.util.Map;
+ 
+ import org.I0Itec.zkclient.IDefaultNameSpace;
+ import org.I0Itec.zkclient.ZkClient;
+ import org.I0Itec.zkclient.ZkServer;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.io.FileUtils;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+ import org.apache.hadoop.yarn.api.records.ApplicationId;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.conf.YarnConfiguration;
+ import org.apache.hadoop.yarn.util.ConverterUtils;
+ import org.apache.helix.HelixController;
+ import org.apache.helix.api.accessor.ClusterAccessor;
+ import org.apache.helix.api.config.ClusterConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ControllerId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+ import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.helix.provisioning.ApplicationSpec;
+ import org.apache.helix.provisioning.ApplicationSpecFactory;
+ import org.apache.helix.provisioning.HelixYarnUtil;
+ import org.apache.helix.provisioning.ServiceConfig;
+ import org.apache.helix.provisioning.TaskConfig;
+ import org.apache.helix.task.TaskDriver;
+ import org.apache.helix.task.Workflow;
+ import org.apache.helix.tools.StateModelConfigGenerator;
+ import org.apache.log4j.Logger;
+ 
+ /**
+  * This will <br/>
+  * <ul>
+  * <li>start zookeeper automatically</li>
+  * <li>create the cluster</li>
+  * <li>set up resource(s)</li>
+  * <li>start helix controller</li>
+  * </ul>
+  */
+ public class AppMasterLauncher {
+   public static Logger LOG = Logger.getLogger(AppMasterLauncher.class);
+ 
+   public static void main(String[] args) throws Exception {
+     Map<String, String> env = System.getenv();
+     LOG.info("Starting app master with the following environment variables");
+     for (String key : env.keySet()) {
+       LOG.info(key + "\t\t=" + env.get(key));
+     }
+ 
+     Options opts;
+     opts = new Options();
+     opts.addOption("num_containers", true, "Number of containers");
+ 
+     // START ZOOKEEPER
+     String dataDir = "dataDir";
+     String logDir = "logDir";
+     IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+       @Override
+       public void createDefaultNameSpace(ZkClient zkClient) {
+ 
+       }
+     };
+     try {
+       FileUtils.deleteDirectory(new File(dataDir));
+       FileUtils.deleteDirectory(new File(logDir));
+     } catch (IOException e) {
+       LOG.error(e);
+     }
+ 
+     final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
+     server.start();
+ 
+     // start Generic AppMaster that interacts with Yarn RM
+     AppMasterConfig appMasterConfig = new AppMasterConfig();
+     String containerIdStr = appMasterConfig.getContainerId();
+     ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+     ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+ 
+     String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString();
+     String className = appMasterConfig.getApplicationSpecFactory();
+ 
+     GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
+     try {
+       genericApplicationMaster.start();
+     } catch (Exception e) {
+       LOG.error("Unable to start application master: ", e);
+     }
+     ApplicationSpecFactory factory = HelixYarnUtil.createInstance(className);
+ 
+     // TODO: Avoid setting static variable.
+     YarnProvisioner.applicationMaster = genericApplicationMaster;
+     YarnProvisioner.applicationMasterConfig = appMasterConfig;
+     ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+     YarnProvisioner.applicationSpec = applicationSpec;
+     String zkAddress = appMasterConfig.getZKAddress();
+     String clusterName = appMasterConfig.getAppName();
+ 
+     // CREATE CLUSTER and setup the resources
+     // connect
+     ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
+     connection.connect();
+ 
+     // create the cluster
+     ClusterId clusterId = ClusterId.from(clusterName);
+     ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+     StateModelDefinition statelessService =
+         new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
+     StateModelDefinition taskStateModel =
+         new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel());
+     clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId)
+         .addStateModelDefinition(statelessService).addStateModelDefinition(taskStateModel).build());
+     for (String service : applicationSpec.getServices()) {
+       String resourceName = service;
+       // add the resource with the local provisioner
+       ResourceId resourceId = ResourceId.from(resourceName);
+ 
+       ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
+       serviceConfig.setSimpleField("service_name", service);
+       int numContainers = serviceConfig.getIntField("num_containers", 1);
+ 
+       YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+       provisionerConfig.setNumContainers(numContainers);
+ 
+       FullAutoRebalancerConfig.Builder rebalancerConfigBuilder =
+           new FullAutoRebalancerConfig.Builder(resourceId);
+       RebalancerConfig rebalancerConfig =
+           rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())//
+               .build();
+       ResourceConfig.Builder resourceConfigBuilder =
+           new ResourceConfig.Builder(ResourceId.from(resourceName));
+       ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig) //
+           .rebalancerConfig(rebalancerConfig) //
+           .userConfig(serviceConfig) //
+           .build();
+       clusterAccessor.addResourceToCluster(resourceConfig);
+     }
+     // start controller
+     ControllerId controllerId = ControllerId.from("controller1");
+     HelixController controller = connection.createController(clusterId, controllerId);
+     controller.start();
+ 
+     // Start any pre-specified jobs
+     List<TaskConfig> taskConfigs = applicationSpec.getTaskConfigs();
+     if (taskConfigs != null) {
+       YarnConfiguration conf = new YarnConfiguration();
+       FileSystem fs;
+       fs = FileSystem.get(conf);
+       for (TaskConfig taskConfig : taskConfigs) {
+         URI yamlUri = taskConfig.getYamlURI();
+         if (yamlUri != null && taskConfig.name != null) {
+           InputStream is =
+               readFromHDFS(fs, taskConfig.name, yamlUri, applicationSpec,
+                   appAttemptID.getApplicationId());
+           Workflow workflow = Workflow.parse(is);
+           TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
+           taskDriver.start(workflow);
+         }
+       }
+     }
+ 
+     Thread shutdownhook = new Thread(new Runnable() {
+       @Override
+       public void run() {
+         server.shutdown();
+       }
+     });
+     Runtime.getRuntime().addShutdownHook(shutdownhook);
+     Thread.sleep(10000);
+ 
+   }
+ 
+   private static InputStream readFromHDFS(FileSystem fs, String name, URI uri,
+       ApplicationSpec appSpec, ApplicationId appId) throws Exception {
+     // will throw exception if the file name is without extension
+     String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+     String pathSuffix = appSpec.getAppName() + "/" + appId.getId() + "/" + name + "." + extension;
+     Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+     return fs.open(dst).getWrappedStream();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
index 0000000,40c8186..c436443
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
@@@ -1,0 -1,84 +1,103 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.Map;
+ 
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.accessor.ClusterAccessor;
+ import org.apache.helix.api.config.ContainerConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.provisioner.ContainerId;
+ import org.apache.helix.controller.provisioner.ContainerState;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ 
+ public class AppStatusReportGenerator {
+   static String TAB = "\t";
+   static String NEWLINE = "\n";
+ 
+   String generateReport(HelixConnection connection, ClusterId clusterId) {
+     if (!connection.isConnected()) {
+       return "Unable to connect to cluster";
+     }
+     StringBuilder builder = new StringBuilder();
+     ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+     Map<ParticipantId, Participant> participants = clusterAccessor.readParticipants();
+     builder.append("AppName").append(TAB).append(clusterId).append(NEWLINE);
+     Map<ResourceId, Resource> resources = clusterAccessor.readResources();
+     for (ResourceId resourceId : resources.keySet()) {
+       builder.append("SERVICE").append(TAB).append(resourceId).append(NEWLINE);
+       Resource resource = resources.get(resourceId);
+       Map<ParticipantId, State> serviceStateMap =
+           resource.getExternalView().getStateMap(PartitionId.from(resourceId.stringify() + "_0"));
+ 
+       builder.append(TAB).append("CONTAINER_NAME").append(TAB).append(TAB)
+           .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB)
+           .append("CONTAINER_ID").append(NEWLINE);
+       for (Participant participant : participants.values()) {
+         // need a better check
+         if (!participant.getId().stringify().startsWith(resource.getId().stringify())) {
+           continue;
+         }
+         ContainerConfig containerConfig = participant.getContainerConfig();
+         ContainerState containerState = ContainerState.UNDEFINED;
+         ContainerId containerId = ContainerId.from("N/A");
+ 
+         if (containerConfig != null) {
+           containerId = containerConfig.getId();
+           containerState = containerConfig.getState();
+         }
+         State participantState = null;
+         if (serviceStateMap != null) {
+           participantState = serviceStateMap.get(participant.getId());
+         }
+         if (participantState == null) {
+           participantState = State.from("UNKNOWN");
+         }
+         builder.append(TAB).append(participant.getId()).append(TAB).append(containerState)
+             .append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
+         builder.append(NEWLINE);
+       }
+ 
+     }
+     return builder.toString();
+ 
+   }
+ 
+   public static void main(String[] args) throws InterruptedException {
+     AppStatusReportGenerator generator = new AppStatusReportGenerator();
+ 
+     ZkHelixConnection connection = new ZkHelixConnection("localhost:2181");
+     connection.connect();
+     while (true) {
+       String generateReport = generator.generateReport(connection, ClusterId.from("testApp1"));
+       System.out.println(generateReport);
+       Thread.sleep(10000);
+       connection.createClusterManagementTool().addCluster("testApp1");
+     }
+     // connection.disconnect();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
index 0000000,83ad461..17c0fe1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
@@@ -1,0 -1,20 +1,39 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.Collection;
+ 
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.provisioner.TargetProvider;
+ import org.apache.helix.controller.provisioner.TargetProviderResponse;
+ 
+ public class FixedTargetProvider implements TargetProvider {
+ 
+   @Override
+   public TargetProviderResponse evaluateExistingContainers(Cluster cluster, ResourceId resourceId,
+       Collection<Participant> participants) {
+     // TODO Auto-generated method stub
+     return null;
+   }
+ 
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
index 0000000,c54f87f..f66dd55
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
@@@ -1,0 -1,79 +1,98 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
 -import java.util.Vector;
+ 
+ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+ import org.apache.hadoop.yarn.api.records.Container;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.util.Records;
+ 
+ /**
+  * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+  * that will execute the shell command.
+  */
+ class LaunchContainerRunnable implements Runnable {
+ 
+   /**
 -   * 
++   *
+    */
+   private final GenericApplicationMaster _genericApplicationMaster;
+ 
+   // Allocated container
+   Container container;
+ 
+   NMCallbackHandler containerListener;
+ 
+   /**
+    * @param lcontainer Allocated container
+    * @param containerListener Callback handler of the container
+    * @param genericApplicationMaster TODO
+    */
 -  public LaunchContainerRunnable(GenericApplicationMaster genericApplicationMaster, Container lcontainer, NMCallbackHandler containerListener) {
++  public LaunchContainerRunnable(GenericApplicationMaster genericApplicationMaster,
++      Container lcontainer, NMCallbackHandler containerListener) {
+     _genericApplicationMaster = genericApplicationMaster;
+     this.container = lcontainer;
+     this.containerListener = containerListener;
+   }
+ 
+   @Override
+   /**
 -   * Connects to CM, sets up container launch context 
 -   * for shell command and eventually dispatches the container 
 -   * start request to the CM. 
++   * Connects to CM, sets up container launch context
++   * for shell command and eventually dispatches the container
++   * start request to the CM.
+    */
+   public void run() {
 -    GenericApplicationMaster.LOG.info("Setting up container launch container for containerid=" + container.getId());
++    GenericApplicationMaster.LOG.info("Setting up container launch container for containerid="
++        + container.getId());
+     ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+ 
+     // Set the environment
 -    //ctx.setEnvironment(shellEnv);
++    // ctx.setEnvironment(shellEnv);
+ 
+     // Set the local resources
+     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ 
+     ctx.setLocalResources(localResources);
+ 
+     // Set the necessary command to execute on the allocated container
 -    Vector<CharSequence> vargs = new Vector<CharSequence>(5);
++    // Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+ 
 -  
+     List<String> commands = new ArrayList<String>();
 -   // commands.add(command.toString());
++    // commands.add(command.toString());
+     ctx.setCommands(commands);
+ 
+     // Set up tokens for the container too. Today, for normal shell commands,
+     // the container in distribute-shell doesn't need any tokens. We are
+     // populating them mainly for NodeManagers to be able to download any
+     // files in the distributed file-system. The tokens are otherwise also
+     // useful in cases, for e.g., when one is running a "hadoop dfs" command
+     // inside the distributed shell.
+     ctx.setTokens(_genericApplicationMaster.allTokens.duplicate());
+ 
+     containerListener.addContainer(container.getId(), container);
+     _genericApplicationMaster.nmClientAsync.startContainerAsync(container, ctx);
+   }
 -}
++}

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
index 0000000,f7c3a9f..7d7883e
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@@ -1,0 -1,84 +1,103 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.nio.ByteBuffer;
+ import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ 
+ import org.apache.hadoop.yarn.api.records.Container;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+ import org.apache.helix.provisioning.ContainerLaunchResponse;
+ import org.apache.helix.provisioning.ContainerStopResponse;
+ import org.apache.log4j.Logger;
+ 
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.util.concurrent.SettableFuture;
+ 
+ @VisibleForTesting
+ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+ 
+   private Logger LOG = Logger.getLogger(NMCallbackHandler.class);
+   private ConcurrentMap<ContainerId, Container> containers =
+       new ConcurrentHashMap<ContainerId, Container>();
+   private final GenericApplicationMaster applicationMaster;
+ 
+   public NMCallbackHandler(GenericApplicationMaster applicationMaster) {
+     this.applicationMaster = applicationMaster;
+   }
+ 
+   public void addContainer(ContainerId containerId, Container container) {
+     containers.putIfAbsent(containerId, container);
+   }
+ 
+   @Override
+   public void onContainerStopped(ContainerId containerId) {
+     LOG.info("Succeeded to stop Container " + containerId);
+     Container container = containers.get(containerId);
+     if (container != null) {
+       applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+     }
+     SettableFuture<ContainerStopResponse> settableFuture =
+         applicationMaster.containerStopMap.remove(containerId);
+     ContainerStopResponse value = new ContainerStopResponse();
+     settableFuture.set(value);
+     containers.remove(containerId);
+   }
+ 
+   @Override
+   public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+     LOG.info("Container Status: id=" + containerId + ", status=" + containerStatus);
+   }
+ 
+   @Override
+   public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+     LOG.debug("Succeeded to start Container " + containerId);
+ 
+     Container container = containers.get(containerId);
+     if (container != null) {
+       applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+     }
+     SettableFuture<ContainerLaunchResponse> settableFuture =
+         applicationMaster.containerLaunchResponseMap.remove(containerId);
+     ContainerLaunchResponse value = new ContainerLaunchResponse();
+     settableFuture.set(value);
+   }
+ 
+   @Override
+   public void onStartContainerError(ContainerId containerId, Throwable t) {
+     LOG.error("Failed to start Container " + containerId);
+     containers.remove(containerId);
+   }
+ 
+   @Override
+   public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+     LOG.error("Failed to query the status of Container " + containerId);
+   }
+ 
+   @Override
+   public void onStopContainerError(ContainerId containerId, Throwable t) {
+     LOG.error("Failed to stop Container " + containerId);
+     containers.remove(containerId);
+   }
+ }

http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 0000000,ced1431..0fc748c
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@@ -1,0 -1,132 +1,150 @@@
+ package org.apache.helix.provisioning.yarn;
+ 
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *   http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied.  See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.List;
+ 
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.yarn.api.records.Container;
+ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+ import org.apache.hadoop.yarn.api.records.ContainerState;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.api.records.NodeReport;
+ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+ import org.apache.helix.provisioning.ContainerAskResponse;
+ import org.apache.helix.provisioning.ContainerReleaseResponse;
+ import org.apache.helix.provisioning.ContainerStopResponse;
+ 
+ import com.google.common.util.concurrent.SettableFuture;
+ 
+ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+   private static final Log LOG = LogFactory.getLog(RMCallbackHandler.class);
+   long startTime;
+   /**
 -   * 
++   *
+    */
+   private final GenericApplicationMaster _genericApplicationMaster;
+ 
+   /**
+    * @param genericApplicationMaster
+    */
+   RMCallbackHandler(GenericApplicationMaster genericApplicationMaster) {
+     _genericApplicationMaster = genericApplicationMaster;
+     startTime = System.currentTimeMillis();
+   }
+ 
 -  @SuppressWarnings("unchecked")
+   @Override
+   public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+     LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+     for (ContainerStatus containerStatus : completedContainers) {
+       GenericApplicationMaster.LOG.info("Got container status for containerID="
+           + containerStatus.getContainerId() + ", state=" + containerStatus.getState()
+           + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
+           + containerStatus.getDiagnostics());
+ 
+       // non complete containers should not be here
+       assert (containerStatus.getState() == ContainerState.COMPLETE);
+       synchronized (_genericApplicationMaster.allocatedContainerSet) {
+         _genericApplicationMaster.allocatedContainerSet.remove(containerStatus.getContainerId());
+         SettableFuture<ContainerStopResponse> stopResponseFuture =
+             _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
+         if (stopResponseFuture != null) {
+           ContainerStopResponse value = new ContainerStopResponse();
+           stopResponseFuture.set(value);
+         } else {
+           SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
+               _genericApplicationMaster.containerReleaseMap
+                   .remove(containerStatus.getContainerId());
+           if (releaseResponseFuture != null) {
+             ContainerReleaseResponse value = new ContainerReleaseResponse();
+             releaseResponseFuture.set(value);
+           }
+         }
+       }
+       // increment counters for completed/failed containers
+       int exitStatus = containerStatus.getExitStatus();
+       if (0 != exitStatus) {
+         // container failed
+         if (ContainerExitStatus.ABORTED != exitStatus) {
+ 
+         } else {
+           // container was killed by framework, possibly preempted
+           // we should re-try as the container was lost for some reason
+ 
+           // we do not need to release the container as it would be done
+           // by the RM
+         }
+       } else {
+         // nothing to do
+         // container completed successfully
+         GenericApplicationMaster.LOG.info("Container completed successfully." + ", containerId="
+             + containerStatus.getContainerId());
+       }
+     }
+   }
+ 
+   @Override
+   public void onContainersAllocated(List<Container> allocatedContainers) {
+     GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt="
+         + allocatedContainers.size());
+     for (Container allocatedContainer : allocatedContainers) {
+       GenericApplicationMaster.LOG.info("Allocated new container." + ", containerId="
+           + allocatedContainer.getId() + ", containerNode="
+           + allocatedContainer.getNodeId().getHost() + ":"
+           + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+           + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+           + allocatedContainer.getResource().getMemory());
+       for (ContainerRequest containerRequest : _genericApplicationMaster.containerRequestMap
+           .keySet()) {
+         if (containerRequest.getCapability().getMemory() == allocatedContainer.getResource()
+             .getMemory()) {
+           SettableFuture<ContainerAskResponse> future =
+               _genericApplicationMaster.containerRequestMap.remove(containerRequest);
+           ContainerAskResponse response = new ContainerAskResponse();
+           response.setContainer(allocatedContainer);
+           _genericApplicationMaster.allocatedContainerSet.add(allocatedContainer.getId());
+           future.set(response);
+           break;
+         }
+       }
+     }
+   }
+ 
+   @Override
+   public void onShutdownRequest() {
+   }
+ 
+   @Override
+   public void onNodesUpdated(List<NodeReport> updatedNodes) {
+   }
+ 
+   @Override
+   public float getProgress() {
+     // set progress to deliver to RM on next heartbeat
+     return (System.currentTimeMillis() - startTime) % Integer.MAX_VALUE;
+   }
+ 
+   @Override
+   public void onError(Throwable e) {
+     _genericApplicationMaster.amRMClient.stop();
+   }
+ }