You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2014/01/11 08:42:47 UTC

svn commit: r1557323 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ hadoop-yarn/hadoop-yar...

Author: vinodkv
Date: Sat Jan 11 07:42:47 2014
New Revision: 1557323

URL: http://svn.apache.org/r1557323
Log:
YARN-1566. Changed Distributed Shell to retain containers across application attempts. Contributed by Jian He.
svn merge --ignore-ancestry -c 1557322 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java
      - copied unchanged from r1557322, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSFailedAppMaster.java
Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1557323&r1=1557322&r2=1557323&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Sat Jan 11 07:42:47 2014
@@ -49,6 +49,9 @@ Release 2.4.0 - UNRELEASED
     ability in ResourceManager to optionally not kill containers when the
     ApplicationMaster exits. (Jian He via vinodkv)
 
+    YARN-1566. Changed Distributed Shell to retain containers across application
+    attempts. (Jian He via vinodkv)
+
   IMPROVEMENTS
 
     YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1557323&r1=1557322&r2=1557323&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Sat Jan 11 07:42:47 2014
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -89,6 +88,8 @@ import org.apache.hadoop.yarn.security.A
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An ApplicationMaster for executing shell commands on a set of launched
  * containers using the YARN framework.
@@ -169,7 +170,8 @@ public class ApplicationMaster {
   private NMCallbackHandler containerListener;
   
   // Application Attempt Id ( combination of attemptId and fail count )
-  private ApplicationAttemptId appAttemptID;
+  @VisibleForTesting
+  protected ApplicationAttemptId appAttemptID;
 
   // TODO
   // For status update for clients - yet to be implemented
@@ -194,13 +196,15 @@ public class ApplicationMaster {
   private AtomicInteger numCompletedContainers = new AtomicInteger();
   // Allocated container count so that we know how many containers has the RM
   // allocated to us
-  private AtomicInteger numAllocatedContainers = new AtomicInteger();
+  @VisibleForTesting
+  protected AtomicInteger numAllocatedContainers = new AtomicInteger();
   // Count of failed containers
   private AtomicInteger numFailedContainers = new AtomicInteger();
   // Count of containers already requested from the RM
   // Needed as once requested, we should not request for containers again.
   // Only request for more if the original requirement changes.
-  private AtomicInteger numRequestedContainers = new AtomicInteger();
+  @VisibleForTesting
+  protected AtomicInteger numRequestedContainers = new AtomicInteger();
 
   // Shell command to be executed
   private String shellCommand = "";
@@ -251,6 +255,7 @@ public class ApplicationMaster {
         System.exit(0);
       }
       result = appMaster.run();
+      appMaster.finish();
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       System.exit(1);
@@ -537,26 +542,25 @@ public class ApplicationMaster {
       containerVirtualCores = maxVCores;
     }
 
+    List<Container> previousAMRunningContainers =
+        response.getContainersFromPreviousAttempt();
+    LOG.info("Received " + previousAMRunningContainers.size()
+        + " previous AM's running containers on AM registration.");
+    numAllocatedContainers.addAndGet(previousAMRunningContainers.size());
+
+    int numTotalContainersToRequest =
+        numTotalContainers - previousAMRunningContainers.size();
     // Setup ask for containers from RM
     // Send request for containers to RM
     // Until we get our fully allocated quota, we keep on polling RM for
     // containers
     // Keep looping until all the containers are launched and shell script
     // executed on them ( regardless of success/failure).
-    for (int i = 0; i < numTotalContainers; ++i) {
+    for (int i = 0; i < numTotalContainersToRequest; ++i) {
       ContainerRequest containerAsk = setupContainerAskForRM();
       amRMClient.addContainerRequest(containerAsk);
     }
-    numRequestedContainers.set(numTotalContainers);
-
-    while (!done
-        && (numCompletedContainers.get() != numTotalContainers)) {
-      try {
-        Thread.sleep(200);
-      } catch (InterruptedException ex) {}
-    }
-    finish();
-    
+    numRequestedContainers.set(numTotalContainersToRequest);
     return success;
   }
 
@@ -565,7 +569,15 @@ public class ApplicationMaster {
     return new NMCallbackHandler(this);
   }
 
-  private void finish() {
+  protected void finish() {
+    // wait for completion.
+    while (!done
+        && (numCompletedContainers.get() != numTotalContainers)) {
+      try {
+        Thread.sleep(200);
+      } catch (InterruptedException ex) {}
+    }
+
     // Join all launched threads
     // needed for when we time out
     // and we need to release containers

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java?rev=1557323&r1=1557322&r2=1557323&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java Sat Jan 11 07:42:47 2014
@@ -162,6 +162,9 @@ public class Client {
   // Timeout threshold for client. Kill app after time interval expires.
   private long clientTimeout = 600000;
 
+  // flag to indicate whether to keep containers across application attempts.
+  private boolean keepContainers = false;
+
   // Debug flag
   boolean debugFlag = false;	
 
@@ -243,6 +246,11 @@ public class Client {
     opts.addOption("container_vcores", true, "Amount of virtual cores to be requested to run the shell command");
     opts.addOption("num_containers", true, "No. of containers on which the shell command needs to be executed");
     opts.addOption("log_properties", true, "log4j.properties file");
+    opts.addOption("keep_containers_across_application_attempts", false,
+      "Flag to indicate whether to keep containers across application attempts." +
+      " If the flag is true, running containers will not be killed when" +
+      " application attempt fails and these containers will be retrieved by" +
+      " the new application attempt ");
     opts.addOption("debug", false, "Dump out debug information");
     opts.addOption("help", false, "Print usage");
 
@@ -294,12 +302,17 @@ public class Client {
 
     }
 
+    if (cliParser.hasOption("keep_containers_across_application_attempts")) {
+      LOG.info("keep_containers_across_application_attempts");
+      keepContainers = true;
+    }
+
     appName = cliParser.getOptionValue("appname", "DistributedShell");
     amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
     amQueue = cliParser.getOptionValue("queue", "default");
     amMemory = Integer.parseInt(cliParser.getOptionValue("master_memory", "10"));		
     amVCores = Integer.parseInt(cliParser.getOptionValue("master_vcores", "1"));
-    
+
     if (amMemory < 0) {
       throw new IllegalArgumentException("Invalid memory specified for application master, exiting."
           + " Specified memory=" + amMemory);
@@ -442,6 +455,8 @@ public class Client {
     // set the application name
     ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
     ApplicationId appId = appContext.getApplicationId();
+
+    appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
     appContext.setApplicationName(appName);
 
     // Set up the container launch context for the application master

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java?rev=1557323&r1=1557322&r2=1557323&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/ContainerLaunchFailAppMaster.java Sat Jan 11 07:42:47 2014
@@ -67,6 +67,7 @@ public class ContainerLaunchFailAppMaste
         System.exit(0);
       }
       result = appMaster.run();
+      appMaster.finish();
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       System.exit(1);

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1557323&r1=1557322&r2=1557323&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Sat Jan 11 07:42:47 2014
@@ -175,6 +175,35 @@ public class TestDistributedShell {
   }
 
   @Test(timeout=90000)
+  public void testDSRestartWithPreviousRunningContainers() throws Exception {
+    String[] args = {
+        "--jar",
+        APPMASTER_JAR,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        Shell.WINDOWS ? "timeout 8" : "sleep 8",
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128",
+        "--keep_containers_across_application_attempts"
+      };
+
+      LOG.info("Initializing DS Client");
+      Client client = new Client(TestDSFailedAppMaster.class.getName(),
+        new Configuration(yarnCluster.getConfig()));
+
+      client.init(args);
+      LOG.info("Running DS Client");
+      boolean result = client.run();
+
+      LOG.info("Client run completed. Result=" + result);
+      // application should succeed
+      Assert.assertTrue(result);
+    }
+
+  @Test(timeout=90000)
   public void testDSShellWithCustomLogPropertyFile() throws Exception {
     final File basedir =
         new File("target", TestDistributedShell.class.getName());