You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/11 08:42:17 UTC
svn commit: r1557322 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/
hadoop-yarn/hadoop-yarn-applicatio...
Author: vinodkv
Date: Sat Jan 11 07:42:16 2014
New Revision: 1557322
URL: http://svn.apache.org/r1557322
Log:
YARN-1566. Changed Distributed Shell to retain containers across application attempts. Contributed by Jian He.
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1557322&r1=1557321&r2=1557322&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Sat Jan 11 07:42:16 2014
@@ -67,6 +67,9 @@ Release 2.4.0 - UNRELEASED
ability in ResourceManager to optionally not kill containers when the
ApplicationMaster exits. (Jian He via vinodkv)
+ YARN-1566. Changed Distributed Shell to retain containers across application
+ attempts. (Jian He via vinodkv)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1557322&r1=1557321&r2=1557322&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Sat Jan 11 07:42:16 2014
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
@@ -89,6 +88,8 @@ import org.apache.hadoop.yarn.security.A
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* An ApplicationMaster for executing shell commands on a set of launched
* containers using the YARN framework.
@@ -169,7 +170,8 @@ public class ApplicationMaster {
private NMCallbackHandler containerListener;
// Application Attempt Id ( combination of attemptId and fail count )
- private ApplicationAttemptId appAttemptID;
+ @VisibleForTesting
+ protected ApplicationAttemptId appAttemptID;
// TODO
// For status update for clients - yet to be implemented
@@ -194,13 +196,15 @@ public class ApplicationMaster {
private AtomicInteger numCompletedContainers = new AtomicInteger();
// Allocated container count so that we know how many containers has the RM
// allocated to us
- private AtomicInteger numAllocatedContainers = new AtomicInteger();
+ @VisibleForTesting
+ protected AtomicInteger numAllocatedContainers = new AtomicInteger();
// Count of failed containers
private AtomicInteger numFailedContainers = new AtomicInteger();
// Count of containers already requested from the RM
// Needed as once requested, we should not request for containers again.
// Only request for more if the original requirement changes.
- private AtomicInteger numRequestedContainers = new AtomicInteger();
+ @VisibleForTesting
+ protected AtomicInteger numRequestedContainers = new AtomicInteger();
// Shell command to be executed
private String shellCommand = "";
@@ -251,6 +255,7 @@ public class ApplicationMaster {
System.exit(0);
}
result = appMaster.run();
+ appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
@@ -537,26 +542,25 @@ public class ApplicationMaster {
containerVirtualCores = maxVCores;
}
+ List<Container> previousAMRunningContainers =
+ response.getContainersFromPreviousAttempt();
+ LOG.info("Received " + previousAMRunningContainers.size()
+ + " previous AM's running containers on AM registration.");
+ numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
+
+ int numTotalContainersToRequest =
+ numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
- for (int i = 0; i < numTotalContainers; ++i) {
+ for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
- numRequestedContainers.set(numTotalContainers);
-
- while (!done
- && (numCompletedContainers.get() != numTotalContainers)) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException ex) {}
- }
- finish();
-
+ numRequestedContainers.set(numTotalContainersToRequest);
return success;
}
@@ -565,7 +569,15 @@ public class ApplicationMaster {
return new NMCallbackHandler(this);
}
- private void finish() {
+ protected void finish() {
+ // wait for completion.
+ while (!done
+ && (numCompletedContainers.get() != numTotalContainers)) {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException ex) {}
+ }
+
// Join all launched threads
// needed for when we time out
// and we need to release containers
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1557322&r1=1557321&r2=1557322&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Sat Jan 11 07:42:16 2014
@@ -162,6 +162,9 @@ public class Client {
// Timeout threshold for client. Kill app after time interval expires.
private long clientTimeout = 600000;
+ // flag to indicate whether to keep containers across application attempts.
+ private boolean keepContainers = false;
+
// Debug flag
boolean debugFlag = false;
@@ -243,6 +246,11 @@ public class Client {
opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
opts.addOption("log_properties", true, "log4j.properties file");
+ opts.addOption("keep_containers_across_application_attempts", false,
+ "Flag to indicate whether to keep containers across application attempts." +
+ " If the flag is true, running containers will not be killed when" +
+ " application attempt fails and these containers will be retrieved by" +
+ " the new application attempt ");
opts.addOption("debug", false, "Dump out debug information");
opts.addOption("help", false, "Print usage");
@@ -294,12 +302,17 @@ public class Client {
}
+ if (cliParser.hasOption("keep_containers_across_application_attempts")) {
+ LOG.info("keep_containers_across_application_attempts");
+ keepContainers = true;
+ }
+
appName = cliParser.getOptionValue("appname", "DistributedShell");
amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
amQueue = cliParser.getOptionValue("queue", "default");
amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));
amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
-
+
if (amMemory < 0) {
throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
+ " Specified memory=" + amMemory);
@@ -442,6 +455,8 @@ public class Client {
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
+
+ appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// Set up the container launch context for the application master
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java?rev=1557322&r1=1557321&r2=1557322&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java Sat Jan 11 07:42:16 2014
@@ -67,6 +67,7 @@ public class ContainerLaunchFailAppMaste
System.exit(0);
}
result = appMaster.run();
+ appMaster.finish();
} catch (Throwable t) {
LOG.fatal("Error running ApplicationMaster", t);
System.exit(1);
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java?rev=1557322&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java Sat Jan 11 07:42:16 2014
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class TestDSFailedAppMaster extends ApplicationMaster {
+
+ private static final Log LOG = LogFactory.getLog(TestDSFailedAppMaster.class);
+
+ @Override
+ public boolean run() throws YarnException, IOException {
+ boolean res = super.run();
+
+ // for the 2nd attempt.
+ if (appAttemptID.getAttemptId() == 2) {
+ // should reuse the earlier running container, so numAllocatedContainers
+ // should be set to 1. And should ask no more containers, so
+ // numRequestedContainers should be set to 0.
+ if (numAllocatedContainers.get() != 1
+ || numRequestedContainers.get() != 0) {
+ LOG.info("Application Master failed. exiting");
+ System.exit(200);
+ }
+ }
+ return res;
+ }
+
+ public static void main(String[] args) {
+ boolean result = false;
+ try {
+ TestDSFailedAppMaster appMaster = new TestDSFailedAppMaster();
+ boolean doRun = appMaster.init(args);
+ if (!doRun) {
+ System.exit(0);
+ }
+ result = appMaster.run();
+ if (appMaster.appAttemptID.getAttemptId() == 1) {
+ try {
+ // sleep some time, wait for the AM to launch a container.
+ Thread.sleep(3000);
+ } catch (InterruptedException e) {}
+ // fail the first am.
+ System.exit(100);
+ }
+ appMaster.finish();
+ } catch (Throwable t) {
+ System.exit(1);
+ }
+ if (result) {
+ LOG.info("Application Master completed successfully. exiting");
+ System.exit(0);
+ } else {
+ LOG.info("Application Master failed. exiting");
+ System.exit(2);
+ }
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1557322&r1=1557321&r2=1557322&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Sat Jan 11 07:42:16 2014
@@ -175,6 +175,35 @@ public class TestDistributedShell {
}
@Test(timeout=90000)
+ public void testDSRestartWithPreviousRunningContainers() throws Exception {
+ String[] args = {
+ "--jar",
+ APPMASTER_JAR,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ Shell.WINDOWS ? "timeout 8" : "sleep 8",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128",
+ "--keep_containers_across_application_attempts"
+ };
+
+ LOG.info("Initializing DS Client");
+ Client client = new Client(TestDSFailedAppMaster.class.getName(),
+ new Configuration(yarnCluster.getConfig()));
+
+ client.init(args);
+ LOG.info("Running DS Client");
+ boolean result = client.run();
+
+ LOG.info("Client run completed. Result=" + result);
+ // application should succeed
+ Assert.assertTrue(result);
+ }
+
+ @Test(timeout=90000)
public void testDSShellWithCustomLogPropertyFile() throws Exception {
final File basedir =
new File("target", TestDistributedShell.class.getName());