You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/10 19:05:31 UTC
[48/50] [abbrv] Merge remote-tracking branch
'origin/helix-provisioning'
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
index 0000000,8154996..f0e3d37
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
@@@ -1,0 -1,98 +1,116 @@@
+ package org.apache.helix.provisioning.tools;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.HashMap;
+ import java.util.Map;
+ import java.util.UUID;
+
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.cli.OptionGroup;
+ import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.accessor.ParticipantAccessor;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.MessageId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.model.Message;
+ import org.apache.helix.model.Message.MessageType;
+ import org.apache.log4j.Logger;
+
+ /**
- *
++ *
+ *
+ */
+ public class ContainerAdmin {
+
+ private static Logger LOG = Logger.getLogger(ContainerAdmin.class);
+ private static String stopContainer = "stopContainer";
+ private HelixConnection _connection;
+
+ public ContainerAdmin(String zkAddress) {
+ _connection = new ZkHelixConnection(zkAddress);
+ _connection.connect();
+ }
+
+ public void stopContainer(String appName, String participantName) throws Exception {
+ ClusterId clusterId = ClusterId.from(appName);
+ ParticipantAccessor participantAccessor = _connection.createParticipantAccessor(clusterId);
+ ParticipantId participantId = ParticipantId.from(participantName);
+ Participant participant = participantAccessor.readParticipant(participantId);
+ if (participant != null && participant.isAlive()) {
+ Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
+ message.setTgtName(participant.getId().toString());
+ message.setTgtSessionId(participant.getRunningInstance().getSessionId());
+ message.setMsgId(message.getId());
+ Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
+ msgMap.put(MessageId.from(message.getId()), message);
+ participantAccessor.insertMessagesToParticipant(participantId, msgMap);
+ do {
+ participant = participantAccessor.readParticipant(participantId);
+ Thread.sleep(1000);
+ LOG.info("Waiting for container:" + participantName + " to shutdown");
- } while (participant!=null && participant.isAlive());
++ } while (participant != null && participant.isAlive());
+ }
-
++
+ }
+
+ @SuppressWarnings("static-access")
+ public static void main(String[] args) throws Exception {
+ Option zkServerOption =
+ OptionBuilder.withLongOpt("zookeeperAddress").withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("zookeeperAddress(Required)");
+
+ OptionGroup group = new OptionGroup();
+ group.setRequired(true);
+
+ // update container count per service
+ Option stopContainerOption =
+ OptionBuilder.withLongOpt(stopContainer).withDescription("appName participantName")
+ .create();
+ stopContainerOption.setArgs(2);
+ stopContainerOption.setRequired(false);
+ stopContainerOption.setArgName("appName participantName");
+
+ group.addOption(stopContainerOption);
+
+ Options options = new Options();
+ options.addOption(zkServerOption);
+ options.addOptionGroup(group);
+ CommandLine cliParser = new GnuParser().parse(options, args);
+
+ String zkAddress = cliParser.getOptionValue("zookeeperAddress");
+ ContainerAdmin admin = new ContainerAdmin(zkAddress);
+
+ if (cliParser.hasOption(stopContainer)) {
+ String appName = cliParser.getOptionValues(stopContainer)[0];
+ String participantName = cliParser.getOptionValues(stopContainer)[1];
+ admin.stopContainer(appName, participantName);
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
index 0000000,f3cce42..f6713d1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/UpdateProvisionerConfig.java
@@@ -1,0 -1,87 +1,106 @@@
+ package org.apache.helix.provisioning.tools;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.OptionBuilder;
+ import org.apache.commons.cli.OptionGroup;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.cli.ParseException;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.accessor.ResourceAccessor;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.provisioning.yarn.YarnProvisionerConfig;
+ import org.apache.log4j.Logger;
+ /**
+ * Update the provisioner config
+ */
+ public class UpdateProvisionerConfig {
+ private static Logger LOG = Logger.getLogger(UpdateProvisionerConfig.class);
+ private static String updateContainerCount = "updateContainerCount";
+ private HelixConnection _connection;
+
+ public UpdateProvisionerConfig(String zkAddress) {
+ _connection = new ZkHelixConnection(zkAddress);
+ _connection.connect();
+ }
+
+ public void setNumContainers(String appName, String serviceName, int numContainers) {
+ ResourceId resourceId = ResourceId.from(serviceName);
+
+ ResourceAccessor resourceAccessor = _connection.createResourceAccessor(ClusterId.from(appName));
+ Resource resource = resourceAccessor.readResource(resourceId);
+ LOG.info("Current provisioner config:"+ resource.getProvisionerConfig());
+
+ ResourceConfig.Delta delta = new ResourceConfig.Delta(resourceId);
+ YarnProvisionerConfig config = new YarnProvisionerConfig(resourceId);
+ config.setNumContainers(numContainers);
+ delta.setProvisionerConfig(config);
+ ResourceConfig updatedResourceConfig = resourceAccessor.updateResource(resourceId, delta);
+ LOG.info("Update provisioner config:"+ updatedResourceConfig.getProvisionerConfig());
+
+ }
+
+ @SuppressWarnings("static-access")
+ public static void main(String[] args) throws ParseException {
+ Option zkServerOption =
+ OptionBuilder.withLongOpt("zookeeperAddress").withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("zookeeperAddress(Required)");
+
+ OptionGroup group = new OptionGroup();
+ group.setRequired(true);
+
+ // update container count per service
+ Option updateContainerCountOption =
+ OptionBuilder.withLongOpt(updateContainerCount)
+ .withDescription("appName serviceName numContainers").create();
+ updateContainerCountOption.setArgs(3);
+ updateContainerCountOption.setRequired(false);
+ updateContainerCountOption.setArgName("appName serviceName numContainers");
+
+ group.addOption(updateContainerCountOption);
+
+ Options options = new Options();
+ options.addOption(zkServerOption);
+ options.addOptionGroup(group);
+ CommandLine cliParser = new GnuParser().parse(options, args);
+
+ String zkAddress = cliParser.getOptionValue("zookeeperAddress");
+ UpdateProvisionerConfig updater = new UpdateProvisionerConfig(zkAddress);
+
+ if (cliParser.hasOption(updateContainerCount)) {
+ String appName = cliParser.getOptionValues(updateContainerCount)[0];
+ String serviceName = cliParser.getOptionValues(updateContainerCount)[1];
+ int numContainers = Integer.parseInt(
+ cliParser.getOptionValues(updateContainerCount)[2]);
+ updater.setNumContainers(appName, serviceName, numContainers);
+ }
+
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
index 0000000,2db4afb..2be7062
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppLauncher.java
@@@ -1,0 -1,561 +1,580 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.nio.ByteBuffer;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Vector;
+
+ import org.apache.commons.cli.CommandLine;
+ import org.apache.commons.cli.GnuParser;
+ import org.apache.commons.cli.Option;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.compress.archivers.ArchiveStreamFactory;
+ import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.io.DataOutputBuffer;
+ import org.apache.hadoop.security.Credentials;
+ import org.apache.hadoop.security.UserGroupInformation;
+ import org.apache.hadoop.security.token.Token;
+ import org.apache.hadoop.yarn.api.ApplicationConstants;
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+ import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+ import org.apache.hadoop.yarn.api.records.ApplicationId;
+ import org.apache.hadoop.yarn.api.records.ApplicationReport;
+ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.api.records.LocalResourceType;
+ import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+ import org.apache.hadoop.yarn.api.records.Priority;
+ import org.apache.hadoop.yarn.api.records.Resource;
+ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+ import org.apache.hadoop.yarn.client.api.YarnClient;
+ import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+ import org.apache.hadoop.yarn.conf.YarnConfiguration;
+ import org.apache.hadoop.yarn.util.ConverterUtils;
+ import org.apache.hadoop.yarn.util.Records;
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.provisioning.ApplicationSpec;
+ import org.apache.helix.provisioning.ApplicationSpecFactory;
+ import org.apache.helix.provisioning.HelixYarnUtil;
+ import org.apache.helix.provisioning.TaskConfig;
+
+ /**
+ * Main class to launch the job.
+ * Gets the yaml file as the input.
+ * Converts yaml file into ApplicationSpec.
+ */
+ public class AppLauncher {
+
+ private static final Log LOG = LogFactory.getLog(AppLauncher.class);
+
+ private ApplicationSpec _applicationSpec;
+ private YarnClient yarnClient;
+ private ApplicationSpecFactory _applicationSpecFactory;
+ private File _yamlConfigFile;
+
+ private YarnConfiguration _conf;
+
+ private File appMasterArchive;
+
+ private ApplicationId _appId;
+
+ private AppMasterConfig _appMasterConfig;
+
+ public AppLauncher(ApplicationSpecFactory applicationSpecFactory, File yamlConfigFile)
+ throws Exception {
+ _applicationSpecFactory = applicationSpecFactory;
+ _yamlConfigFile = yamlConfigFile;
+ init();
+ }
+
+ private void init() throws Exception {
+ _applicationSpec = _applicationSpecFactory.fromYaml(new FileInputStream(_yamlConfigFile));
+ _appMasterConfig = new AppMasterConfig();
+ appMasterArchive = new File(_applicationSpec.getAppMasterPackage());
+ yarnClient = YarnClient.createYarnClient();
+ _conf = new YarnConfiguration();
+ yarnClient.init(_conf);
+ }
+
+ public ApplicationSpec getApplicationSpec() {
+ return _applicationSpec;
+ }
+
+ public boolean launch() throws Exception {
+ LOG.info("Running Client");
+ yarnClient.start();
+
+ // Get a new application id
+ YarnClientApplication app = yarnClient.createApplication();
+ GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
+ // TODO get min/max resource capabilities from RM and change memory ask if needed
+ // If we do not have min/max, we may not be able to correctly request
+ // the required resources from the RM for the app master
+ // Memory ask has to be a multiple of min and less than max.
+ // Dump out information about cluster capability as seen by the resource manager
+ int maxMem = appResponse.getMaximumResourceCapability().getMemory();
+ LOG.info("Max mem capabililty of resources in this cluster " + maxMem);
+
+ // set the application name
+ ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
+ _appId = appContext.getApplicationId();
+ _appMasterConfig.setAppId(_appId.getId());
+ String appName = _applicationSpec.getAppName();
+ _appMasterConfig.setAppName(appName);
+ _appMasterConfig.setApplicationSpecFactory(_applicationSpecFactory.getClass()
+ .getCanonicalName());
+ appContext.setApplicationName(appName);
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+ LOG.info("Copy Application archive file from local filesystem and add to local environment");
+ // Copy the application master jar to the filesystem
+ // Create a local resource to point to the destination jar path
+ FileSystem fs = FileSystem.get(_conf);
+
+ // get packages for each component packages
+ Map<String, URI> packages = new HashMap<String, URI>();
+ packages
+ .put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterArchive.toURI());
+ packages.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), _yamlConfigFile.toURI());
+ for (String serviceName : _applicationSpec.getServices()) {
+ packages.put(serviceName, _applicationSpec.getServicePackage(serviceName));
+ }
+ Map<String, Path> hdfsDest = new HashMap<String, Path>();
+ Map<String, String> classpathMap = new HashMap<String, String>();
+ for (String name : packages.keySet()) {
+ URI uri = packages.get(name);
+ Path dst = copyToHDFS(fs, name, uri);
+ hdfsDest.put(name, dst);
+ String classpath = generateClasspathAfterExtraction(name, new File(uri));
+ classpathMap.put(name, classpath);
+ _appMasterConfig.setClasspath(name, classpath);
+ String serviceMainClass = _applicationSpec.getServiceMainClass(name);
+ if (serviceMainClass != null) {
+ _appMasterConfig.setMainClass(name, serviceMainClass);
+ }
+ }
+
+ // Get YAML files describing all workflows to immediately start
+ Map<String, URI> workflowFiles = new HashMap<String, URI>();
+ List<TaskConfig> taskConfigs = _applicationSpec.getTaskConfigs();
+ if (taskConfigs != null) {
+ for (TaskConfig taskConfig : taskConfigs) {
+ URI configUri = taskConfig.getYamlURI();
+ if (taskConfig.name != null && configUri != null) {
+ workflowFiles.put(taskConfig.name, taskConfig.getYamlURI());
+ }
+ }
+ }
+
+ // set local resources for the application master
+ // local files or archives as needed
+ // In this scenario, the jar file for the application master is part of the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ LocalResource appMasterPkg =
+ setupLocalResource(fs,
+ hdfsDest.get(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString()));
+ LocalResource appSpecFile =
+ setupLocalResource(fs,
+ hdfsDest.get(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString()));
+ localResources.put(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString(), appMasterPkg);
+ localResources.put(AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString(), appSpecFile);
+ for (String name : workflowFiles.keySet()) {
+ URI uri = workflowFiles.get(name);
+ Path dst = copyToHDFS(fs, name, uri);
+ LocalResource taskLocalResource = setupLocalResource(fs, dst);
+ localResources.put(AppMasterConfig.AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + name,
+ taskLocalResource);
+ }
+
+ // Set local resource info into app master container launch context
+ amContainer.setLocalResources(localResources);
+
+ // Set the necessary security tokens as needed
+ // amContainer.setContainerTokens(containerToken);
+
+ // Add AppMaster.jar location to classpath
+ // At some point we should not be required to add
+ // the hadoop specific classpaths to the env.
+ // It should be provided out of the box.
+ // For now setting all required classpaths including
+ // the classpath to "." for the application jar
+ StringBuilder classPathEnv =
+ new StringBuilder(Environment.CLASSPATH.$()).append(File.pathSeparatorChar).append("./*")
+ .append(File.pathSeparatorChar);
+ classPathEnv.append(classpathMap.get(AppMasterConfig.AppEnvironment.APP_MASTER_PKG.toString()));
+ for (String c : _conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ classPathEnv.append(File.pathSeparatorChar);
+ classPathEnv.append(c.trim());
+ }
+ classPathEnv.append(File.pathSeparatorChar).append("./log4j.properties");
+
+ // add the runtime classpath needed for tests to work
+ if (_conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
+ classPathEnv.append(':');
+ classPathEnv.append(System.getProperty("java.class.path"));
+ }
+ LOG.info("\n\n Setting the classpath to launch AppMaster:\n\n");
+ // Set the env variables to be setup in the env where the application master will be run
+ Map<String, String> env = new HashMap<String, String>(_appMasterConfig.getEnv());
+ env.put("CLASSPATH", classPathEnv.toString());
+
+ amContainer.setEnvironment(env);
+
+ // Set the necessary command to execute the application master
+ Vector<CharSequence> vargs = new Vector<CharSequence>(30);
+
+ // Set java executable command
+ LOG.info("Setting up app master launch command");
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+ int amMemory = 1024;
+ // Set Xmx based on am memory size
+ vargs.add("-Xmx" + amMemory + "m");
+ // Set class name
+ vargs.add(AppMasterLauncher.class.getCanonicalName());
+ // Set params for Application Master
+ // vargs.add("--num_containers " + String.valueOf(numContainers));
+
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
+
+ // Get final commmand
+ StringBuilder command = new StringBuilder();
+ for (CharSequence str : vargs) {
+ command.append(str).append(" ");
+ }
+
+ LOG.info("Completed setting up app master command " + command.toString());
+ List<String> commands = new ArrayList<String>();
+ commands.add(command.toString());
+ amContainer.setCommands(commands);
+
+ // Set up resource type requirements
+ // For now, only memory is supported so we set memory requirements
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(amMemory);
+ appContext.setResource(capability);
+
+ // Service data is a binary blob that can be passed to the application
+ // Not needed in this scenario
+ // amContainer.setServiceData(serviceData);
+
+ // Setup security tokens
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials credentials = new Credentials();
+ String tokenRenewer = _conf.get(YarnConfiguration.RM_PRINCIPAL);
+ if (tokenRenewer == null || tokenRenewer.length() == 0) {
+ throw new IOException("Can't get Master Kerberos principal for the RM to use as renewer");
+ }
+
+ // For now, only getting tokens for the default file-system.
+ final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials);
+ if (tokens != null) {
+ for (Token<?> token : tokens) {
+ LOG.info("Got dt for " + fs.getUri() + "; " + token);
+ }
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ amContainer.setTokens(fsTokens);
+ }
+
+ appContext.setAMContainerSpec(amContainer);
+
+ // Set the priority for the application master
+ Priority pri = Records.newRecord(Priority.class);
+ int amPriority = 0;
+ // TODO - what is the range for priority? how to decide?
+ pri.setPriority(amPriority);
+ appContext.setPriority(pri);
+
+ String amQueue = "default";
+ // Set the queue to which this application is to be submitted in the RM
+ appContext.setQueue(amQueue);
+
+ LOG.info("Submitting application to YARN Resource Manager");
+
+ ApplicationId applicationId = yarnClient.submitApplication(appContext);
+
+ LOG.info("Submitted application with applicationId:" + applicationId);
+
+ return true;
+ }
+
+ /**
+ * Generates the classpath after the archive file gets extracted under 'serviceName' folder
+ * @param serviceName
+ * @param archiveFile
+ * @return
+ */
+ private String generateClasspathAfterExtraction(String serviceName, File archiveFile) {
+ if (!isArchive(archiveFile.getAbsolutePath())) {
+ return "./";
+ }
+ StringBuilder classpath = new StringBuilder();
+ // put the jar files under the archive in the classpath
+ try {
+ final InputStream is = new FileInputStream(archiveFile);
+ final TarArchiveInputStream debInputStream =
+ (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is);
+ TarArchiveEntry entry = null;
+ while ((entry = (TarArchiveEntry) debInputStream.getNextEntry()) != null) {
+ if (entry.isFile()) {
+ classpath.append(File.pathSeparatorChar);
+ classpath.append("./" + serviceName + "/" + entry.getName());
+ }
+ }
+ debInputStream.close();
+
+ } catch (Exception e) {
+ LOG.error("Unable to read archive file:" + archiveFile, e);
+ }
+ return classpath.toString();
+ }
+
+ private Path copyToHDFS(FileSystem fs, String name, URI uri) throws Exception {
+ // will throw exception if the file name is without extension
+ String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+ String pathSuffix =
+ _applicationSpec.getAppName() + "/" + _appId.getId() + "/" + name + "." + extension;
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ Path src = new Path(uri);
+ fs.copyFromLocalFile(false, true, src, dst);
+ return dst;
+ }
+
+ private LocalResource setupLocalResource(FileSystem fs, Path dst) throws Exception {
+ URI uri = dst.toUri();
+ String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+ FileStatus destStatus = fs.getFileStatus(dst);
+ LocalResource amJarRsrc = Records.newRecord(LocalResource.class);
+ // Set the type of resource - file or archive
+ // archives are untarred at destination
+ // we don't need the jar file to be untarred for now
+ if (isArchive(extension)) {
+ amJarRsrc.setType(LocalResourceType.ARCHIVE);
+ } else {
+ amJarRsrc.setType(LocalResourceType.FILE);
+ }
+ // Set visibility of the resource
+ // Setting to most private option
+ amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ // Set the resource to be copied over
+ amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+ // Set timestamp and length of file so that the framework
+ // can do basic sanity checks for the local resource
+ // after it has been copied over to ensure it is the same
+ // resource the client intended to use with the application
+ amJarRsrc.setTimestamp(destStatus.getModificationTime());
+ amJarRsrc.setSize(destStatus.getLen());
+ return amJarRsrc;
+ }
+
+ private boolean isArchive(String path) {
+ return path.endsWith("tar") || path.endsWith("gz") || path.endsWith("tar.gz")
+ || path.endsWith("zip");
+ }
+
+ public HelixConnection pollForConnection() {
+ String prevReport = "";
+ HelixConnection connection = null;
+
+ while (true) {
+ try {
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(_appId);
+
+ String reportMessage = generateReport(report);
+ if (!reportMessage.equals(prevReport)) {
+ LOG.info(reportMessage);
+ }
+ YarnApplicationState state = report.getYarnApplicationState();
+ if (YarnApplicationState.RUNNING == state) {
+ if (connection == null) {
+ String hostName = null;
+ int ind = report.getHost().indexOf('/');
+ if (ind > -1) {
+ hostName = report.getHost().substring(ind + 1);
+ } else {
+ hostName = report.getHost();
+ }
+ connection = new ZkHelixConnection(hostName + ":2181");
+
+ try {
+ connection.connect();
+ } catch (Exception e) {
+ LOG.warn("AppMaster started but not yet initialized");
+ connection = null;
+ }
+ }
+ if (connection.isConnected()) {
+ return connection;
+ }
+ }
+ prevReport = reportMessage;
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ LOG.error("Exception while getting info ", e);
+ break;
+ }
+ }
+ return null;
+ }
+
+ public ApplicationReport getApplicationReport() {
+ try {
+ return yarnClient.getApplicationReport(_appId);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+
+ /**
+ * @return true if successfully completed, it will print status every X seconds
+ */
+ public boolean waitUntilDone() {
+ String prevReport = "";
+ HelixConnection connection = null;
+
+ while (true) {
+ try {
+ // Get application report for the appId we are interested in
+ ApplicationReport report = yarnClient.getApplicationReport(_appId);
+
+ String reportMessage = generateReport(report);
+ if (!reportMessage.equals(prevReport)) {
+ LOG.info(reportMessage);
+ }
+ YarnApplicationState state = report.getYarnApplicationState();
+ FinalApplicationStatus dsStatus = report.getFinalApplicationStatus();
+ if (YarnApplicationState.FINISHED == state) {
+ if (FinalApplicationStatus.SUCCEEDED == dsStatus) {
+ LOG.info("Application has completed successfully. Breaking monitoring loop");
+ return true;
+ } else {
+ LOG.info("Application did finished unsuccessfully." + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+ } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) {
+ LOG.info("Application did not finish." + " YarnState=" + state.toString()
+ + ", DSFinalStatus=" + dsStatus.toString() + ". Breaking monitoring loop");
+ return false;
+ }
+ if (YarnApplicationState.RUNNING == state) {
+ if (connection == null) {
+ String hostName = null;
+ int ind = report.getHost().indexOf('/');
+ if (ind > -1) {
+ hostName = report.getHost().substring(ind + 1);
+ } else {
+ hostName = report.getHost();
+ }
+ connection = new ZkHelixConnection(hostName + ":2181");
+
+ try {
+ connection.connect();
+ } catch (Exception e) {
+ LOG.warn("AppMaster started but not yet initialized");
+ connection = null;
+ }
+ }
+ if (connection.isConnected()) {
+ AppStatusReportGenerator generator = new AppStatusReportGenerator();
+ ClusterId clusterId = ClusterId.from(_applicationSpec.getAppName());
+ String generateReport = generator.generateReport(connection, clusterId);
+ LOG.info(generateReport);
+ }
+ }
+ prevReport = reportMessage;
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ LOG.error("Exception while getting info ");
+ break;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * TODO: kill the app only in dev mode. In prod, its ok for the app to continue running if the
+ * launcher dies after launching
+ */
+
+ private String generateReport(ApplicationReport report) {
+ return "Got application report from ASM for" + ", appId=" + _appId.getId()
+ + ", clientToAMToken=" + report.getClientToAMToken() + ", appDiagnostics="
+ + report.getDiagnostics() + ", appMasterHost=" + report.getHost() + ", appQueue="
+ + report.getQueue() + ", appMasterRpcPort=" + report.getRpcPort() + ", appStartTime="
+ + report.getStartTime() + ", yarnAppState=" + report.getYarnApplicationState().toString()
+ + ", distributedFinalState=" + report.getFinalApplicationStatus().toString()
+ + ", appTrackingUrl=" + report.getTrackingUrl() + ", appUser=" + report.getUser();
+ }
+
+ public void cleanup() {
+ LOG.info("Cleaning up");
+ try {
+ ApplicationReport applicationReport = yarnClient.getApplicationReport(_appId);
+ LOG.info("Killing application:" + _appId + " \n Application report"
+ + generateReport(applicationReport));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Launches the application on a YARN cluster. Once launched, it will display (periodically) the
+ * status of the containers in the application.
+ * @param args app_spec_provider and app_config_spec
+ * @throws Exception
+ */
+ public static void main(String[] args) throws Exception {
+
+ Options opts = new Options();
+ opts.addOption(new Option("app_spec_provider", true,
+ "Application Spec Factory Class that will parse the app_config_spec file"));
+ opts.addOption(new Option("app_config_spec", true,
+ "YAML config file that provides the app specifications"));
+ CommandLine cliParser = new GnuParser().parse(opts, args);
+ String appSpecFactoryClass = cliParser.getOptionValue("app_spec_provider");
+ String yamlConfigFileName = cliParser.getOptionValue("app_config_spec");
+
+ ApplicationSpecFactory applicationSpecFactory =
+ HelixYarnUtil.createInstance(appSpecFactoryClass);
+ File yamlConfigFile = new File(yamlConfigFileName);
+ if (!yamlConfigFile.exists()) {
+ throw new IllegalArgumentException("YAML app_config_spec file: '" + yamlConfigFileName
+ + "' does not exist");
+ }
+ final AppLauncher launcher = new AppLauncher(applicationSpecFactory, yamlConfigFile);
+ launcher.launch();
+ Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ launcher.cleanup();
+ }
+ }));
+ launcher.waitUntilDone();
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
index 0000000,38a0dd1..d0952c1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterConfig.java
@@@ -1,0 -1,111 +1,130 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.HashMap;
+ import java.util.Map;
+
+ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+ import org.apache.log4j.Logger;
+
+ /**
+ * Convenient method to pass information to containers
+ * The methods simply sets up environment variables
+ */
+ public class AppMasterConfig {
+ private static Logger LOG = Logger.getLogger(AppMasterConfig.class);
+ private Map<String, String> _envs;
+
+ public enum AppEnvironment {
+ APP_MASTER_PKG("APP_MASTER_PKG"),
+ APP_SPEC_FILE("APP_SPEC_FILE"),
+ APP_NAME("APP_NAME"),
+ APP_ID("APP_ID"),
+ APP_SPEC_FACTORY("APP_SPEC_FACTORY"),
+ TASK_CONFIG_FILE("TASK_CONFIG_FILE");
+ String _name;
+
+ private AppEnvironment(String name) {
+ _name = name;
+ }
+
+ public String toString() {
+ return _name;
+ }
+ }
+
+ public AppMasterConfig() {
+ _envs = new HashMap<String, String>();
+ }
+
+ private String get(String key) {
+ String value = (_envs.containsKey(key)) ? _envs.get(key) : System.getenv().get(key);
+ LOG.info("Returning value:" + value + " for key:'" + key + "'");
+
+ return value;
+ }
+
+ public void setAppId(int id) {
+ _envs.put(AppEnvironment.APP_ID.toString(), "" + id);
+ }
+
+ public String getAppName() {
+ return get(AppEnvironment.APP_NAME.toString());
+ }
+
+ public int getAppId() {
+ return Integer.parseInt(get(AppEnvironment.APP_ID.toString()));
+ }
+
+ public String getClassPath(String serviceName) {
+ return get(serviceName + "_classpath");
+ }
+
+ public String getMainClass(String serviceName) {
+ return get(serviceName + "_mainClass");
+ }
+
+ public String getZKAddress() {
+ return get(Environment.NM_HOST.name()) + ":2181";
+ }
+
+ public String getContainerId() {
+ return get(Environment.CONTAINER_ID.name());
+ }
+
+ public Map<String, String> getEnv() {
+ return _envs;
+ }
+
+ public void setAppName(String appName) {
+ _envs.put(AppEnvironment.APP_NAME.toString(), appName);
+
+ }
+
+ public void setClasspath(String serviceName, String classpath) {
+ _envs.put(serviceName + "_classpath", classpath);
+ }
+
+ public void setTaskConfigFile(String configName, String path) {
+ _envs.put(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName, path);
+ }
+
+ public String getTaskConfigFile(String configName) {
+ return get(AppEnvironment.TASK_CONFIG_FILE.toString() + "_" + configName);
+ }
+
+ public String getApplicationSpecConfigFile() {
+ return get(AppEnvironment.APP_SPEC_FILE.toString());
+ }
+
+ public String getApplicationSpecFactory() {
+ return get(AppEnvironment.APP_SPEC_FACTORY.toString());
+ }
+
+ public void setApplicationSpecFactory(String className) {
+ _envs.put(AppEnvironment.APP_SPEC_FACTORY.toString(), className);
+
+ }
+
+ public void setMainClass(String serviceName, String serviceMainClass) {
+ _envs.put(serviceName + "_mainClass", serviceMainClass);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
index 0000000,e7a0f61..31ef05c
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppMasterLauncher.java
@@@ -1,0 -1,194 +1,213 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.io.File;
+ import java.io.FileInputStream;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URI;
+ import java.util.List;
+ import java.util.Map;
+
+ import org.I0Itec.zkclient.IDefaultNameSpace;
+ import org.I0Itec.zkclient.ZkClient;
+ import org.I0Itec.zkclient.ZkServer;
+ import org.apache.commons.cli.Options;
+ import org.apache.commons.io.FileUtils;
+ import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+ import org.apache.hadoop.yarn.api.records.ApplicationId;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.conf.YarnConfiguration;
+ import org.apache.hadoop.yarn.util.ConverterUtils;
+ import org.apache.helix.HelixController;
+ import org.apache.helix.api.accessor.ClusterAccessor;
+ import org.apache.helix.api.config.ClusterConfig;
+ import org.apache.helix.api.config.ResourceConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ControllerId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.rebalancer.config.FullAutoRebalancerConfig;
+ import org.apache.helix.controller.rebalancer.config.RebalancerConfig;
+ import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+ import org.apache.helix.model.StateModelDefinition;
+ import org.apache.helix.provisioning.ApplicationSpec;
+ import org.apache.helix.provisioning.ApplicationSpecFactory;
+ import org.apache.helix.provisioning.HelixYarnUtil;
+ import org.apache.helix.provisioning.ServiceConfig;
+ import org.apache.helix.provisioning.TaskConfig;
+ import org.apache.helix.task.TaskDriver;
+ import org.apache.helix.task.Workflow;
+ import org.apache.helix.tools.StateModelConfigGenerator;
+ import org.apache.log4j.Logger;
+
+ /**
+ * This will <br/>
+ * <ul>
+ * <li>start zookeeper automatically</li>
+ * <li>create the cluster</li>
+ * <li>set up resource(s)</li>
+ * <li>start helix controller</li>
+ * </ul>
+ */
+ public class AppMasterLauncher {
+ public static Logger LOG = Logger.getLogger(AppMasterLauncher.class);
+
+ public static void main(String[] args) throws Exception {
+ Map<String, String> env = System.getenv();
+ LOG.info("Starting app master with the following environment variables");
+ for (String key : env.keySet()) {
+ LOG.info(key + "\t\t=" + env.get(key));
+ }
+
+ Options opts;
+ opts = new Options();
+ opts.addOption("num_containers", true, "Number of containers");
+
+ // START ZOOKEEPER
+ String dataDir = "dataDir";
+ String logDir = "logDir";
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+
+ }
+ };
+ try {
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+
+ final ZkServer server = new ZkServer(dataDir, logDir, defaultNameSpace);
+ server.start();
+
+ // start Generic AppMaster that interacts with Yarn RM
+ AppMasterConfig appMasterConfig = new AppMasterConfig();
+ String containerIdStr = appMasterConfig.getContainerId();
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
+
+ String configFile = AppMasterConfig.AppEnvironment.APP_SPEC_FILE.toString();
+ String className = appMasterConfig.getApplicationSpecFactory();
+
+ GenericApplicationMaster genericApplicationMaster = new GenericApplicationMaster(appAttemptID);
+ try {
+ genericApplicationMaster.start();
+ } catch (Exception e) {
+ LOG.error("Unable to start application master: ", e);
+ }
+ ApplicationSpecFactory factory = HelixYarnUtil.createInstance(className);
+
+ // TODO: Avoid setting static variable.
+ YarnProvisioner.applicationMaster = genericApplicationMaster;
+ YarnProvisioner.applicationMasterConfig = appMasterConfig;
+ ApplicationSpec applicationSpec = factory.fromYaml(new FileInputStream(configFile));
+ YarnProvisioner.applicationSpec = applicationSpec;
+ String zkAddress = appMasterConfig.getZKAddress();
+ String clusterName = appMasterConfig.getAppName();
+
+ // CREATE CLUSTER and setup the resources
+ // connect
+ ZkHelixConnection connection = new ZkHelixConnection(zkAddress);
+ connection.connect();
+
+ // create the cluster
+ ClusterId clusterId = ClusterId.from(clusterName);
+ ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+ StateModelDefinition statelessService =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForStatelessService());
+ StateModelDefinition taskStateModel =
+ new StateModelDefinition(StateModelConfigGenerator.generateConfigForTaskStateModel());
+ clusterAccessor.createCluster(new ClusterConfig.Builder(clusterId)
+ .addStateModelDefinition(statelessService).addStateModelDefinition(taskStateModel).build());
+ for (String service : applicationSpec.getServices()) {
+ String resourceName = service;
+ // add the resource with the local provisioner
+ ResourceId resourceId = ResourceId.from(resourceName);
+
+ ServiceConfig serviceConfig = applicationSpec.getServiceConfig(resourceName);
+ serviceConfig.setSimpleField("service_name", service);
+ int numContainers = serviceConfig.getIntField("num_containers", 1);
+
+ YarnProvisionerConfig provisionerConfig = new YarnProvisionerConfig(resourceId);
+ provisionerConfig.setNumContainers(numContainers);
+
+ FullAutoRebalancerConfig.Builder rebalancerConfigBuilder =
+ new FullAutoRebalancerConfig.Builder(resourceId);
+ RebalancerConfig rebalancerConfig =
+ rebalancerConfigBuilder.stateModelDefId(statelessService.getStateModelDefId())//
+ .build();
+ ResourceConfig.Builder resourceConfigBuilder =
+ new ResourceConfig.Builder(ResourceId.from(resourceName));
+ ResourceConfig resourceConfig = resourceConfigBuilder.provisionerConfig(provisionerConfig) //
+ .rebalancerConfig(rebalancerConfig) //
+ .userConfig(serviceConfig) //
+ .build();
+ clusterAccessor.addResourceToCluster(resourceConfig);
+ }
+ // start controller
+ ControllerId controllerId = ControllerId.from("controller1");
+ HelixController controller = connection.createController(clusterId, controllerId);
+ controller.start();
+
+ // Start any pre-specified jobs
+ List<TaskConfig> taskConfigs = applicationSpec.getTaskConfigs();
+ if (taskConfigs != null) {
+ YarnConfiguration conf = new YarnConfiguration();
+ FileSystem fs;
+ fs = FileSystem.get(conf);
+ for (TaskConfig taskConfig : taskConfigs) {
+ URI yamlUri = taskConfig.getYamlURI();
+ if (yamlUri != null && taskConfig.name != null) {
+ InputStream is =
+ readFromHDFS(fs, taskConfig.name, yamlUri, applicationSpec,
+ appAttemptID.getApplicationId());
+ Workflow workflow = Workflow.parse(is);
+ TaskDriver taskDriver = new TaskDriver(new HelixConnectionAdaptor(controller));
+ taskDriver.start(workflow);
+ }
+ }
+ }
+
+ Thread shutdownhook = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ server.shutdown();
+ }
+ });
+ Runtime.getRuntime().addShutdownHook(shutdownhook);
+ Thread.sleep(10000);
+
+ }
+
+ private static InputStream readFromHDFS(FileSystem fs, String name, URI uri,
+ ApplicationSpec appSpec, ApplicationId appId) throws Exception {
+ // will throw exception if the file name is without extension
+ String extension = uri.getPath().substring(uri.getPath().lastIndexOf(".") + 1);
+ String pathSuffix = appSpec.getAppName() + "/" + appId.getId() + "/" + name + "." + extension;
+ Path dst = new Path(fs.getHomeDirectory(), pathSuffix);
+ return fs.open(dst).getWrappedStream();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
index 0000000,40c8186..c436443
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/AppStatusReportGenerator.java
@@@ -1,0 -1,84 +1,103 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.Map;
+
+ import org.apache.helix.HelixConnection;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.Resource;
+ import org.apache.helix.api.State;
+ import org.apache.helix.api.accessor.ClusterAccessor;
+ import org.apache.helix.api.config.ContainerConfig;
+ import org.apache.helix.api.id.ClusterId;
+ import org.apache.helix.api.id.ParticipantId;
+ import org.apache.helix.api.id.PartitionId;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.provisioner.ContainerId;
+ import org.apache.helix.controller.provisioner.ContainerState;
+ import org.apache.helix.manager.zk.ZkHelixConnection;
+
+ public class AppStatusReportGenerator {
+ static String TAB = "\t";
+ static String NEWLINE = "\n";
+
+ String generateReport(HelixConnection connection, ClusterId clusterId) {
+ if (!connection.isConnected()) {
+ return "Unable to connect to cluster";
+ }
+ StringBuilder builder = new StringBuilder();
+ ClusterAccessor clusterAccessor = connection.createClusterAccessor(clusterId);
+ Map<ParticipantId, Participant> participants = clusterAccessor.readParticipants();
+ builder.append("AppName").append(TAB).append(clusterId).append(NEWLINE);
+ Map<ResourceId, Resource> resources = clusterAccessor.readResources();
+ for (ResourceId resourceId : resources.keySet()) {
+ builder.append("SERVICE").append(TAB).append(resourceId).append(NEWLINE);
+ Resource resource = resources.get(resourceId);
+ Map<ParticipantId, State> serviceStateMap =
+ resource.getExternalView().getStateMap(PartitionId.from(resourceId.stringify() + "_0"));
+
+ builder.append(TAB).append("CONTAINER_NAME").append(TAB).append(TAB)
+ .append("CONTAINER_STATE").append(TAB).append("SERVICE_STATE").append(TAB)
+ .append("CONTAINER_ID").append(NEWLINE);
+ for (Participant participant : participants.values()) {
+ // need a better check
+ if (!participant.getId().stringify().startsWith(resource.getId().stringify())) {
+ continue;
+ }
+ ContainerConfig containerConfig = participant.getContainerConfig();
+ ContainerState containerState = ContainerState.UNDEFINED;
+ ContainerId containerId = ContainerId.from("N/A");
+
+ if (containerConfig != null) {
+ containerId = containerConfig.getId();
+ containerState = containerConfig.getState();
+ }
+ State participantState = null;
+ if (serviceStateMap != null) {
+ participantState = serviceStateMap.get(participant.getId());
+ }
+ if (participantState == null) {
+ participantState = State.from("UNKNOWN");
+ }
+ builder.append(TAB).append(participant.getId()).append(TAB).append(containerState)
+ .append(TAB).append(participantState).append(TAB).append(TAB).append(containerId);
+ builder.append(NEWLINE);
+ }
+
+ }
+ return builder.toString();
+
+ }
+
+ public static void main(String[] args) throws InterruptedException {
+ AppStatusReportGenerator generator = new AppStatusReportGenerator();
+
+ ZkHelixConnection connection = new ZkHelixConnection("localhost:2181");
+ connection.connect();
+ while (true) {
+ String generateReport = generator.generateReport(connection, ClusterId.from("testApp1"));
+ System.out.println(generateReport);
+ Thread.sleep(10000);
+ connection.createClusterManagementTool().addCluster("testApp1");
+ }
+ // connection.disconnect();
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
index 0000000,83ad461..17c0fe1
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/FixedTargetProvider.java
@@@ -1,0 -1,20 +1,39 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.Collection;
+
+ import org.apache.helix.api.Cluster;
+ import org.apache.helix.api.Participant;
+ import org.apache.helix.api.id.ResourceId;
+ import org.apache.helix.controller.provisioner.TargetProvider;
+ import org.apache.helix.controller.provisioner.TargetProviderResponse;
+
+ public class FixedTargetProvider implements TargetProvider {
+
+ @Override
+ public TargetProviderResponse evaluateExistingContainers(Cluster cluster, ResourceId resourceId,
+ Collection<Participant> participants) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
index 0000000,c54f87f..f66dd55
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/LaunchContainerRunnable.java
@@@ -1,0 -1,79 +1,98 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
-import java.util.Vector;
+
+ import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+ import org.apache.hadoop.yarn.api.records.Container;
+ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+ import org.apache.hadoop.yarn.api.records.LocalResource;
+ import org.apache.hadoop.yarn.util.Records;
+
+ /**
+ * Thread to connect to the {@link ContainerManagementProtocol} and launch the container
+ * that will execute the shell command.
+ */
+ class LaunchContainerRunnable implements Runnable {
+
+ /**
- *
++ *
+ */
+ private final GenericApplicationMaster _genericApplicationMaster;
+
+ // Allocated container
+ Container container;
+
+ NMCallbackHandler containerListener;
+
+ /**
+ * @param lcontainer Allocated container
+ * @param containerListener Callback handler of the container
+ * @param genericApplicationMaster TODO
+ */
- public LaunchContainerRunnable(GenericApplicationMaster genericApplicationMaster, Container lcontainer, NMCallbackHandler containerListener) {
++ public LaunchContainerRunnable(GenericApplicationMaster genericApplicationMaster,
++ Container lcontainer, NMCallbackHandler containerListener) {
+ _genericApplicationMaster = genericApplicationMaster;
+ this.container = lcontainer;
+ this.containerListener = containerListener;
+ }
+
+ @Override
+ /**
- * Connects to CM, sets up container launch context
- * for shell command and eventually dispatches the container
- * start request to the CM.
++ * Connects to CM, sets up container launch context
++ * for shell command and eventually dispatches the container
++ * start request to the CM.
+ */
+ public void run() {
- GenericApplicationMaster.LOG.info("Setting up container launch container for containerid=" + container.getId());
++ GenericApplicationMaster.LOG.info("Setting up container launch container for containerid="
++ + container.getId());
+ ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
+
+ // Set the environment
- //ctx.setEnvironment(shellEnv);
++ // ctx.setEnvironment(shellEnv);
+
+ // Set the local resources
+ Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+
+ ctx.setLocalResources(localResources);
+
+ // Set the necessary command to execute on the allocated container
- Vector<CharSequence> vargs = new Vector<CharSequence>(5);
++ // Vector<CharSequence> vargs = new Vector<CharSequence>(5);
+
-
+ List<String> commands = new ArrayList<String>();
- // commands.add(command.toString());
++ // commands.add(command.toString());
+ ctx.setCommands(commands);
+
+ // Set up tokens for the container too. Today, for normal shell commands,
+ // the container in distribute-shell doesn't need any tokens. We are
+ // populating them mainly for NodeManagers to be able to download any
+ // files in the distributed file-system. The tokens are otherwise also
+ // useful in cases, for e.g., when one is running a "hadoop dfs" command
+ // inside the distributed shell.
+ ctx.setTokens(_genericApplicationMaster.allTokens.duplicate());
+
+ containerListener.addContainer(container.getId(), container);
+ _genericApplicationMaster.nmClientAsync.startContainerAsync(container, ctx);
+ }
-}
++}
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
index 0000000,f7c3a9f..7d7883e
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@@ -1,0 -1,84 +1,103 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.nio.ByteBuffer;
+ import java.util.Map;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+
+ import org.apache.hadoop.yarn.api.records.Container;
+ import org.apache.hadoop.yarn.api.records.ContainerId;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+ import org.apache.helix.provisioning.ContainerLaunchResponse;
+ import org.apache.helix.provisioning.ContainerStopResponse;
+ import org.apache.log4j.Logger;
+
+ import com.google.common.annotations.VisibleForTesting;
+ import com.google.common.util.concurrent.SettableFuture;
+
+ @VisibleForTesting
+ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
+
+ private Logger LOG = Logger.getLogger(NMCallbackHandler.class);
+ private ConcurrentMap<ContainerId, Container> containers =
+ new ConcurrentHashMap<ContainerId, Container>();
+ private final GenericApplicationMaster applicationMaster;
+
+ public NMCallbackHandler(GenericApplicationMaster applicationMaster) {
+ this.applicationMaster = applicationMaster;
+ }
+
+ public void addContainer(ContainerId containerId, Container container) {
+ containers.putIfAbsent(containerId, container);
+ }
+
+ @Override
+ public void onContainerStopped(ContainerId containerId) {
+ LOG.info("Succeeded to stop Container " + containerId);
+ Container container = containers.get(containerId);
+ if (container != null) {
+ applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ }
+ SettableFuture<ContainerStopResponse> settableFuture =
+ applicationMaster.containerStopMap.remove(containerId);
+ ContainerStopResponse value = new ContainerStopResponse();
+ settableFuture.set(value);
+ containers.remove(containerId);
+ }
+
+ @Override
+ public void onContainerStatusReceived(ContainerId containerId, ContainerStatus containerStatus) {
+ LOG.info("Container Status: id=" + containerId + ", status=" + containerStatus);
+ }
+
+ @Override
+ public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> allServiceResponse) {
+ LOG.debug("Succeeded to start Container " + containerId);
+
+ Container container = containers.get(containerId);
+ if (container != null) {
+ applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+ }
+ SettableFuture<ContainerLaunchResponse> settableFuture =
+ applicationMaster.containerLaunchResponseMap.remove(containerId);
+ ContainerLaunchResponse value = new ContainerLaunchResponse();
+ settableFuture.set(value);
+ }
+
+ @Override
+ public void onStartContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to start Container " + containerId);
+ containers.remove(containerId);
+ }
+
+ @Override
+ public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to query the status of Container " + containerId);
+ }
+
+ @Override
+ public void onStopContainerError(ContainerId containerId, Throwable t) {
+ LOG.error("Failed to stop Container " + containerId);
+ containers.remove(containerId);
+ }
+ }
http://git-wip-us.apache.org/repos/asf/helix/blob/713586c4/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
----------------------------------------------------------------------
diff --cc helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
index 0000000,ced1431..0fc748c
mode 000000,100644..100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/RMCallbackHandler.java
@@@ -1,0 -1,132 +1,150 @@@
+ package org.apache.helix.provisioning.yarn;
+
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing,
++ * software distributed under the License is distributed on an
++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++ * KIND, either express or implied. See the License for the
++ * specific language governing permissions and limitations
++ * under the License.
++ */
++
+ import java.util.List;
+
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.apache.hadoop.yarn.api.records.Container;
+ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+ import org.apache.hadoop.yarn.api.records.ContainerState;
+ import org.apache.hadoop.yarn.api.records.ContainerStatus;
+ import org.apache.hadoop.yarn.api.records.NodeReport;
+ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+ import org.apache.helix.provisioning.ContainerAskResponse;
+ import org.apache.helix.provisioning.ContainerReleaseResponse;
+ import org.apache.helix.provisioning.ContainerStopResponse;
+
+ import com.google.common.util.concurrent.SettableFuture;
+
+ class RMCallbackHandler implements AMRMClientAsync.CallbackHandler {
+ private static final Log LOG = LogFactory.getLog(RMCallbackHandler.class);
+ long startTime;
+ /**
- *
++ *
+ */
+ private final GenericApplicationMaster _genericApplicationMaster;
+
+ /**
+ * @param genericApplicationMaster
+ */
+ RMCallbackHandler(GenericApplicationMaster genericApplicationMaster) {
+ _genericApplicationMaster = genericApplicationMaster;
+ startTime = System.currentTimeMillis();
+ }
+
- @SuppressWarnings("unchecked")
+ @Override
+ public void onContainersCompleted(List<ContainerStatus> completedContainers) {
+ LOG.info("Got response from RM for container ask, completedCnt=" + completedContainers.size());
+ for (ContainerStatus containerStatus : completedContainers) {
+ GenericApplicationMaster.LOG.info("Got container status for containerID="
+ + containerStatus.getContainerId() + ", state=" + containerStatus.getState()
+ + ", exitStatus=" + containerStatus.getExitStatus() + ", diagnostics="
+ + containerStatus.getDiagnostics());
+
+ // non complete containers should not be here
+ assert (containerStatus.getState() == ContainerState.COMPLETE);
+ synchronized (_genericApplicationMaster.allocatedContainerSet) {
+ _genericApplicationMaster.allocatedContainerSet.remove(containerStatus.getContainerId());
+ SettableFuture<ContainerStopResponse> stopResponseFuture =
+ _genericApplicationMaster.containerStopMap.remove(containerStatus.getContainerId());
+ if (stopResponseFuture != null) {
+ ContainerStopResponse value = new ContainerStopResponse();
+ stopResponseFuture.set(value);
+ } else {
+ SettableFuture<ContainerReleaseResponse> releaseResponseFuture =
+ _genericApplicationMaster.containerReleaseMap
+ .remove(containerStatus.getContainerId());
+ if (releaseResponseFuture != null) {
+ ContainerReleaseResponse value = new ContainerReleaseResponse();
+ releaseResponseFuture.set(value);
+ }
+ }
+ }
+ // increment counters for completed/failed containers
+ int exitStatus = containerStatus.getExitStatus();
+ if (0 != exitStatus) {
+ // container failed
+ if (ContainerExitStatus.ABORTED != exitStatus) {
+
+ } else {
+ // container was killed by framework, possibly preempted
+ // we should re-try as the container was lost for some reason
+
+ // we do not need to release the container as it would be done
+ // by the RM
+ }
+ } else {
+ // nothing to do
+ // container completed successfully
+ GenericApplicationMaster.LOG.info("Container completed successfully." + ", containerId="
+ + containerStatus.getContainerId());
+ }
+ }
+ }
+
+ @Override
+ public void onContainersAllocated(List<Container> allocatedContainers) {
+ GenericApplicationMaster.LOG.info("Got response from RM for container ask, allocatedCnt="
+ + allocatedContainers.size());
+ for (Container allocatedContainer : allocatedContainers) {
+ GenericApplicationMaster.LOG.info("Allocated new container." + ", containerId="
+ + allocatedContainer.getId() + ", containerNode="
+ + allocatedContainer.getNodeId().getHost() + ":"
+ + allocatedContainer.getNodeId().getPort() + ", containerNodeURI="
+ + allocatedContainer.getNodeHttpAddress() + ", containerResourceMemory"
+ + allocatedContainer.getResource().getMemory());
+ for (ContainerRequest containerRequest : _genericApplicationMaster.containerRequestMap
+ .keySet()) {
+ if (containerRequest.getCapability().getMemory() == allocatedContainer.getResource()
+ .getMemory()) {
+ SettableFuture<ContainerAskResponse> future =
+ _genericApplicationMaster.containerRequestMap.remove(containerRequest);
+ ContainerAskResponse response = new ContainerAskResponse();
+ response.setContainer(allocatedContainer);
+ _genericApplicationMaster.allocatedContainerSet.add(allocatedContainer.getId());
+ future.set(response);
+ break;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void onShutdownRequest() {
+ }
+
+ @Override
+ public void onNodesUpdated(List<NodeReport> updatedNodes) {
+ }
+
+ @Override
+ public float getProgress() {
+ // set progress to deliver to RM on next heartbeat
+ return (System.currentTimeMillis() - startTime) % Integer.MAX_VALUE;
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ _genericApplicationMaster.amRMClient.stop();
+ }
+ }