You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/12/28 19:09:38 UTC

[hadoop] branch trunk updated: YARN-10334. Close clients in TestDistributedShell (#2571)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 513f199  YARN-10334. Close clients in TestDistributedShell (#2571)
513f199 is described below

commit 513f1995adc9b73f9c7f4c7beb89725b51b313ac
Author: Ahmed Hussein <50...@users.noreply.github.com>
AuthorDate: Mon Dec 28 14:09:10 2020 -0500

    YARN-10334. Close clients in TestDistributedShell (#2571)
---
 .../yarn/applications/distributedshell/Client.java |  49 +-
 .../distributedshell/TestDistributedShell.java     | 784 ++++++++++-----------
 2 files changed, 434 insertions(+), 399 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index d7114d0..5da4384 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -32,6 +32,7 @@ import java.util.Set;
 import java.util.Vector;
 import java.util.Arrays;
 import java.util.Base64;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
 
@@ -253,6 +254,10 @@ public class Client {
   // Command line options
   private Options opts;
 
+  private final AtomicBoolean stopSignalReceived;
+  private final AtomicBoolean isRunning;
+  private final Object objectLock = new Object();
+
   private static final String shellCommandPath = "shellCommands";
   private static final String shellArgsPath = "shellArgs";
   private static final String appMasterJarPath = "AppMaster.jar";
@@ -413,6 +418,8 @@ public class Client {
     opts.addOption("application_tags", true, "Application tags.");
     opts.addOption("localize_files", true, "List of files, separated by comma"
         + " to be localized for the command");
+    stopSignalReceived = new AtomicBoolean(false);
+    isRunning = new AtomicBoolean(false);
   }
 
   /**
@@ -670,8 +677,8 @@ public class Client {
    * @throws YarnException
    */
   public boolean run() throws IOException, YarnException {
-
     LOG.info("Running Client");
+    isRunning.set(true);
     yarnClient.start();
     // set the client start time.
     clientStartTime = System.currentTimeMillis();
@@ -1116,15 +1123,22 @@ public class Client {
 
     boolean res = false;
     boolean needForceKill = false;
-    while (true) {
+    while (isRunning.get()) {
       // Check app status every 1 second.
       try {
-        Thread.sleep(APP_MONITOR_INTERVAL);
+        synchronized (objectLock) {
+          objectLock.wait(APP_MONITOR_INTERVAL);
+        }
+        needForceKill = stopSignalReceived.get();
       } catch (InterruptedException e) {
         LOG.warn("Thread sleep in monitoring loop interrupted");
         // if the application is to be killed when client times out;
         // then set needForceKill to true
         break;
+      } finally {
+        if (needForceKill) {
+          break;
+        }
       }
 
       // Get application report for the appId we are interested in 
@@ -1177,6 +1191,8 @@ public class Client {
       forceKillApplication(appId);
     }
 
+    isRunning.set(false);
+
     return res;
   }
 
@@ -1388,4 +1404,31 @@ public class Client {
     }
     return resources;
   }
+
+  @VisibleForTesting
+  protected void sendStopSignal() {
+    LOG.info("Sending stop Signal to Client");
+    stopSignalReceived.set(true);
+    synchronized (objectLock) {
+      objectLock.notifyAll();
+    }
+    int waitCount = 0;
+    LOG.info("Waiting for Client to exit loop");
+    while (!isRunning.get()) {
+      try {
+        Thread.sleep(50);
+      } catch (InterruptedException ie) {
+        // do nothing
+      } finally {
+        waitCount++;
+        if (isRunning.get() || waitCount > 2000) {
+          break;
+        }
+      }
+    }
+    LOG.info("Stopping yarnClient within the Client");
+    yarnClient.stop();
+    yarnClient.waitForServiceToStop(clientTimeout);
+    LOG.info("done stopping Client");
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 438b12b..009ef3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
 import java.io.BufferedReader;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FileReader;
 import java.io.IOException;
@@ -38,6 +39,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import java.util.function.Supplier;
@@ -88,6 +90,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
 import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
@@ -128,6 +131,9 @@ public class TestDistributedShell {
   private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
   private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
   private static final int MIN_ALLOCATION_MB = 128;
+  private static final int TEST_TIME_OUT = 150000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private static final int TEST_TIME_WINDOW_EXPIRE = (TEST_TIME_OUT * 90) / 100;
 
   protected final static String APPMASTER_JAR =
       JarFinder.getJar(ApplicationMaster.class);
@@ -135,17 +141,29 @@ public class TestDistributedShell {
   @Rule
   public TimelineVersionWatcher timelineVersionWatcher
       = new TimelineVersionWatcher();
+
   @Rule
-  public Timeout globalTimeout = new Timeout(90000);
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+
   @Rule
   public TemporaryFolder tmpFolder = new TemporaryFolder();
 
   @Rule
   public TestName name = new TestName();
 
-  private String generateAppName() {
-    return name.getMethodName().replaceFirst("test", "");
-  }
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private final String yarnClientTimeout =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+
+  private final String[] commonArgs = {
+      "--jar",
+      APPMASTER_JAR,
+      "--timeout",
+      yarnClientTimeout,
+      "--appname",
+      ""
+  };
 
   @Before
   public void setup() throws Exception {
@@ -168,6 +186,7 @@ public class TestDistributedShell {
         MIN_ALLOCATION_MB);
     // reduce the teardown waiting time
     conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
     conf.set("yarn.log.dir", "target");
     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
     // mark if we need to launch the v1 timeline server
@@ -201,11 +220,10 @@ public class TestDistributedShell {
       conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
           CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
     } else if (timelineVersion == 1.5f) {
-      if (hdfsCluster == null) {
-        HdfsConfiguration hdfsConfig = new HdfsConfiguration();
-        hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
-            .numDataNodes(1).build();
-      }
+      HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+      hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+          .numDataNodes(1).build();
+      hdfsCluster.waitActive();
       fs = hdfsCluster.getFileSystem();
       PluginStoreTestUtils.prepareFileSystemForPluginStore(fs);
       PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
@@ -231,39 +249,39 @@ public class TestDistributedShell {
     } else {
       Assert.fail("Wrong timeline version number: " + timelineVersion);
     }
-    
-    if (yarnCluster == null) {
-      yarnCluster =
-          new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
-              numNodeManager, 1, 1);
-      yarnCluster.init(conf);
-      
-      yarnCluster.start();
-
-      conf.set(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-          MiniYARNCluster.getHostname() + ":"
-              + yarnCluster.getApplicationHistoryServer().getPort());
-
-      waitForNMsToRegister();
-
-      URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
-      if (url == null) {
-        throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
-      }
-      Configuration yarnClusterConfig = yarnCluster.getConfig();
-      yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-          new File(url.getPath()).getParent());
-      //write the document to a buffer (not directly to the file, as that
-      //can cause the file being written to get read -which will then fail.
-      ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-      yarnClusterConfig.writeXml(bytesOut);
-      bytesOut.close();
-      //write the bytes to the file in the classpath
-      OutputStream os = new FileOutputStream(new File(url.getPath()));
-      os.write(bytesOut.toByteArray());
-      os.close();
+
+    yarnCluster =
+        new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
+            numNodeManager, 1, 1);
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    conf.set(
+        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+        MiniYARNCluster.getHostname() + ":"
+            + yarnCluster.getApplicationHistoryServer().getPort());
+
+    waitForNMsToRegister();
+
+    URL url = Thread.currentThread().getContextClassLoader().getResource(
+        "yarn-site.xml");
+    if (url == null) {
+      throw new RuntimeException(
+          "Could not find 'yarn-site.xml' dummy file in classpath");
     }
+    Configuration yarnClusterConfig = yarnCluster.getConfig();
+    yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        new File(url.getPath()).getParent());
+    //write the document to a buffer (not directly to the file, as that
+    //can cause the file being written to get read -which will then fail.
+    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+    yarnClusterConfig.writeXml(bytesOut);
+    bytesOut.close();
+    //write the bytes to the file in the classpath
+    OutputStream os = new FileOutputStream(url.getPath());
+    os.write(bytesOut.toByteArray());
+    os.close();
+
     FileContext fsContext = FileContext.getLocalFSFileContext();
     fsContext
         .delete(
@@ -278,6 +296,11 @@ public class TestDistributedShell {
 
   @After
   public void tearDown() throws IOException {
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
     if (yarnCluster != null) {
       try {
         yarnCluster.stop();
@@ -292,11 +315,6 @@ public class TestDistributedShell {
         hdfsCluster = null;
       }
     }
-    FileContext fsContext = FileContext.getLocalFSFileContext();
-    fsContext
-        .delete(
-            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
-            true);
   }
 
   @Test
@@ -345,9 +363,7 @@ public class TestDistributedShell {
 
   public void testDSShell(boolean haveDomain, boolean defaultFlow)
       throws Exception {
-    String[] args = {
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "2",
         "--shell_command",
@@ -359,8 +375,8 @@ public class TestDistributedShell {
         "--container_memory",
         "128",
         "--container_vcores",
-        "1"
-    };
+        "1");
+
     if (haveDomain) {
       String[] domainArgs = {
           "--domain",
@@ -391,6 +407,7 @@ public class TestDistributedShell {
     }
 
     LOG.info("Initializing DS Client");
+    YarnClient yarnClient;
     final Client client = new Client(new Configuration(yarnCluster.getConfig()));
     boolean initSuccess = client.init(args);
     Assert.assertTrue(initSuccess);
@@ -407,7 +424,7 @@ public class TestDistributedShell {
     };
     t.start();
 
-    YarnClient yarnClient = YarnClient.createYarnClient();
+    yarnClient = YarnClient.createYarnClient();
     yarnClient.init(new Configuration(yarnCluster.getConfig()));
     yarnClient.start();
 
@@ -415,15 +432,15 @@ public class TestDistributedShell {
     String errorMessage = "";
     ApplicationId appId = null;
     ApplicationReport appReport = null;
-    while(!verified) {
+    while (!verified) {
       List<ApplicationReport> apps = yarnClient.getApplications();
-      if (apps.size() == 0 ) {
+      if (apps.size() == 0) {
         Thread.sleep(10);
         continue;
       }
       appReport = apps.get(0);
       appId = appReport.getApplicationId();
-      if(appReport.getHost().equals("N/A")) {
+      if (appReport.getHost().equals("N/A")) {
         Thread.sleep(10);
         continue;
       }
@@ -436,7 +453,7 @@ public class TestDistributedShell {
 
       if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
           && appReport.getFinalApplicationStatus() !=
-              FinalApplicationStatus.UNDEFINED) {
+          FinalApplicationStatus.UNDEFINED) {
         break;
       }
     }
@@ -463,11 +480,10 @@ public class TestDistributedShell {
       }
     }
 
-    TimelineDomain domain = null;
     if (!isTestingTimelineV2) {
       checkTimelineV1(haveDomain);
     } else {
-      checkTimelineV2(haveDomain, appId, defaultFlow, appReport);
+      checkTimelineV2(appId, defaultFlow, appReport);
     }
   }
 
@@ -489,8 +505,8 @@ public class TestDistributedShell {
     Assert.assertEquals(1, entitiesAttempts.getEntities().size());
     Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
         .size());
-    Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
-        .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
+    Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
+        ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
     if (haveDomain) {
       Assert.assertEquals(domain.getId(),
           entitiesAttempts.getEntities().get(0).getDomainId());
@@ -512,8 +528,8 @@ public class TestDistributedShell {
             null, null, null, null, primaryFilter, null, null, null);
     Assert.assertNotNull(entities);
     Assert.assertEquals(2, entities.getEntities().size());
-    Assert.assertEquals(entities.getEntities().get(0).getEntityType()
-        .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
+    Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
+        ApplicationMaster.DSEntity.DS_CONTAINER.toString());
 
     String entityId = entities.getEntities().get(0).getEntityId();
     org.apache.hadoop.yarn.api.records.timeline.TimelineEntity entity =
@@ -532,7 +548,7 @@ public class TestDistributedShell {
     }
   }
 
-  private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
+  private void checkTimelineV2(ApplicationId appId,
       boolean defaultFlow, ApplicationReport appReport) throws Exception {
     LOG.info("Started checkTimelineV2 ");
     // For PoC check using the file-based timeline writer (YARN-3264)
@@ -635,7 +651,13 @@ public class TestDistributedShell {
       verifyEntityForTimelineV2(appAttemptEntityFile,
           AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
     } finally {
-      FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+      try {
+        FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+      } catch (FileNotFoundException ex) {
+        // the recursive delete can throw an exception when one of the file
+        // does not exist.
+        LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
+      }
     }
   }
 
@@ -673,7 +695,7 @@ public class TestDistributedShell {
     long actualCount = 0;
     for (int i = 0; i < checkTimes; i++) {
       BufferedReader reader = null;
-      String strLine = null;
+      String strLine;
       actualCount = 0;
       try {
         reader = new BufferedReader(new FileReader(entityFile));
@@ -710,7 +732,9 @@ public class TestDistributedShell {
           }
         }
       } finally {
-        reader.close();
+        if (reader != null) {
+          reader.close();
+        }
       }
       if (numOfExpectedEvent == actualCount) {
         break;
@@ -727,14 +751,35 @@ public class TestDistributedShell {
    * Utility function to merge two String arrays to form a new String array for
    * our argumemts.
    *
-   * @param args
-   * @param newArgs
+   * @param args the first set of the arguments.
+   * @param newArgs the second set of the arguments.
    * @return a String array consists of {args, newArgs}
    */
   private String[] mergeArgs(String[] args, String[] newArgs) {
-    List<String> argsList = new ArrayList<String>(Arrays.asList(args));
-    argsList.addAll(Arrays.asList(newArgs));
-    return argsList.toArray(new String[argsList.size()]);
+    int length = args.length + newArgs.length;
+    String[] result = new String[length];
+    System.arraycopy(args, 0, result, 0, args.length);
+    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+    return result;
+  }
+
+  private String generateAppName(String postFix) {
+    return name.getMethodName().replaceFirst("test", "")
+        .concat(postFix == null? "" : "-" + postFix);
+  }
+
+  private String[] createArguments(String... args) {
+    String[] res = mergeArgs(commonArgs, args);
+    // set the application name so we can track down which command is running.
+    res[commonArgs.length - 1] = generateAppName(null);
+    return res;
+  }
+
+  private String[] createArgsWithPostFix(int index, String... args) {
+    String[] res = mergeArgs(commonArgs, args);
+    // set the application name so we can track down which command is running.
+    res[commonArgs.length - 1] = generateAppName(String.valueOf(index));
+    return res;
   }
 
   protected String getSleepCommand(int sec) {
@@ -745,11 +790,7 @@ public class TestDistributedShell {
 
   @Test
   public void testDSRestartWithPreviousRunningContainers() throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
@@ -759,20 +800,20 @@ public class TestDistributedShell {
         "--container_memory",
         "128",
         "--keep_containers_across_application_attempts"
-      };
+    );
 
-      LOG.info("Initializing DS Client");
-      Client client = new Client(TestDSFailedAppMaster.class.getName(),
+    LOG.info("Initializing DS Client");
+    Client client = new Client(TestDSFailedAppMaster.class.getName(),
         new Configuration(yarnCluster.getConfig()));
 
-      client.init(args);
-      LOG.info("Running DS Client");
-      boolean result = client.run();
+    client.init(args);
 
-      LOG.info("Client run completed. Result=" + result);
-      // application should succeed
-      Assert.assertTrue(result);
-    }
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+    LOG.info("Client run completed. Result=" + result);
+    // application should succeed
+    Assert.assertTrue(result);
+  }
 
   /*
    * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
@@ -782,11 +823,7 @@ public class TestDistributedShell {
    */
   @Test
   public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
@@ -797,22 +834,23 @@ public class TestDistributedShell {
         "128",
         "--attempt_failures_validity_interval",
         "2500"
-      };
+    );
 
-      LOG.info("Initializing DS Client");
-      Configuration conf = yarnCluster.getConfig();
-      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-      Client client = new Client(TestDSSleepingAppMaster.class.getName(),
-        new Configuration(conf));
+    LOG.info("Initializing DS Client");
+    Configuration config = yarnCluster.getConfig();
+    config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    Client client = new Client(TestDSSleepingAppMaster.class.getName(),
+        new Configuration(config));
 
-      client.init(args);
-      LOG.info("Running DS Client");
-      boolean result = client.run();
+    client.init(args);
 
-      LOG.info("Client run completed. Result=" + result);
-      // application should succeed
-      Assert.assertTrue(result);
-    }
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+
+    LOG.info("Client run completed. Result=" + result);
+    // application should succeed
+    Assert.assertTrue(result);
+  }
 
   /*
    * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
@@ -822,11 +860,7 @@ public class TestDistributedShell {
    */
   @Test
   public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
@@ -837,22 +871,23 @@ public class TestDistributedShell {
         "128",
         "--attempt_failures_validity_interval",
         "15000"
-      };
+    );
 
-      LOG.info("Initializing DS Client");
-      Configuration conf = yarnCluster.getConfig();
-      conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-      Client client = new Client(TestDSSleepingAppMaster.class.getName(),
-        new Configuration(conf));
+    LOG.info("Initializing DS Client");
+    Configuration config = yarnCluster.getConfig();
+    config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    Client client = new Client(TestDSSleepingAppMaster.class.getName(),
+        new Configuration(config));
 
-      client.init(args);
-      LOG.info("Running DS Client");
-      boolean result = client.run();
+    client.init(args);
 
-      LOG.info("Client run completed. Result=" + result);
-      // application should be failed
-      Assert.assertFalse(result);
-    }
+    LOG.info("Running DS Client");
+    boolean result = client.run();
+
+    LOG.info("Client run completed. Result=" + result);
+    // application should be failed
+    Assert.assertFalse(result);
+  }
 
   @Test
   public void testDSShellWithCustomLogPropertyFile() throws Exception {
@@ -871,11 +906,7 @@ public class TestDistributedShell {
     // set the output to DEBUG level
     fileWriter.write("log4j.rootLogger=debug,stdout");
     fileWriter.close();
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "3",
         "--shell_command",
@@ -892,7 +923,7 @@ public class TestDistributedShell {
         "128",
         "--container_vcores",
         "1"
-    };
+    );
 
     //Before run the DS, the default the log level is INFO
     final Logger LOG_Client =
@@ -908,6 +939,7 @@ public class TestDistributedShell {
         new Client(new Configuration(yarnCluster.getConfig()));
     boolean initSuccess = client.init(args);
     Assert.assertTrue(initSuccess);
+
     LOG.info("Running DS Client");
     boolean result = client.run();
     LOG.info("Client run completed. Result=" + result);
@@ -922,16 +954,12 @@ public class TestDistributedShell {
   @Test
   public void testSpecifyingLogAggregationContext() throws Exception {
     String regex = ".*(foo|bar)\\d";
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--shell_command",
         "echo",
         "--rolling_log_pattern",
         regex
-    };
+    );
     final Client client =
         new Client(new Configuration(yarnCluster.getConfig()));
     Assert.assertTrue(client.init(args));
@@ -946,11 +974,7 @@ public class TestDistributedShell {
 
   public void testDSShellWithCommands() throws Exception {
 
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "2",
         "--shell_command",
@@ -963,7 +987,7 @@ public class TestDistributedShell {
         "128",
         "--container_vcores",
         "1"
-    };
+    );
 
     LOG.info("Initializing DS Client");
     final Client client =
@@ -971,20 +995,20 @@ public class TestDistributedShell {
     boolean initSuccess = client.init(args);
     Assert.assertTrue(initSuccess);
     LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-    List<String> expectedContent = new ArrayList<String>();
-    expectedContent.add("output_expected");
-    verifyContainerLog(2, expectedContent, false, "");
+    try {
+      boolean result = client.run();
+      LOG.info("Client run completed. Result=" + result);
+      List<String> expectedContent = new ArrayList<>();
+      expectedContent.add("output_expected");
+      verifyContainerLog(2, expectedContent, false, "");
+    } finally {
+      client.sendStopSignal();
+    }
   }
 
   @Test
   public void testDSShellWithMultipleArgs() throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "4",
         "--shell_command",
@@ -999,7 +1023,7 @@ public class TestDistributedShell {
         "128",
         "--container_vcores",
         "1"
-    };
+    );
 
     LOG.info("Initializing DS Client");
     final Client client =
@@ -1007,9 +1031,10 @@ public class TestDistributedShell {
     boolean initSuccess = client.init(args);
     Assert.assertTrue(initSuccess);
     LOG.info("Running DS Client");
+
     boolean result = client.run();
     LOG.info("Client run completed. Result=" + result);
-    List<String> expectedContent = new ArrayList<String>();
+    List<String> expectedContent = new ArrayList<>();
     expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
     verifyContainerLog(4, expectedContent, false, "");
   }
@@ -1031,12 +1056,8 @@ public class TestDistributedShell {
     // set the output to DEBUG level
     fileWriter.write("echo testDSShellWithShellScript");
     fileWriter.close();
-    System.out.println(customShellScript.getAbsolutePath());
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    LOG.info(customShellScript.getAbsolutePath());
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_script",
@@ -1049,7 +1070,7 @@ public class TestDistributedShell {
         "128",
         "--container_vcores",
         "1"
-    };
+    );
 
     LOG.info("Initializing DS Client");
     final Client client =
@@ -1059,7 +1080,7 @@ public class TestDistributedShell {
     LOG.info("Running DS Client");
     boolean result = client.run();
     LOG.info("Client run completed. Result=" + result);
-    List<String> expectedContent = new ArrayList<String>();
+    List<String> expectedContent = new ArrayList<>();
     expectedContent.add("testDSShellWithShellScript");
     verifyContainerLog(1, expectedContent, false, "");
   }
@@ -1067,7 +1088,7 @@ public class TestDistributedShell {
   @Test
   public void testDSShellWithInvalidArgs() throws Exception {
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
-
+    int appNameCounter = 0;
     LOG.info("Initializing DS Client with no args");
     try {
       client.init(new String[]{});
@@ -1079,9 +1100,7 @@ public class TestDistributedShell {
 
     LOG.info("Initializing DS Client with no jar file");
     try {
-      String[] args = {
-          "--appname",
-          generateAppName(),
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "2",
           "--shell_command",
@@ -1090,8 +1109,9 @@ public class TestDistributedShell {
           "512",
           "--container_memory",
           "128"
-      };
-      client.init(args);
+      );
+      String[] argsNoJar = Arrays.copyOfRange(args, 2, args.length);
+      client.init(argsNoJar);
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
       Assert.assertTrue("The throw exception is not expected",
@@ -1100,16 +1120,14 @@ public class TestDistributedShell {
 
     LOG.info("Initializing DS Client with no shell command");
     try {
-      String[] args = {
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "2",
           "--master_memory",
           "512",
           "--container_memory",
           "128"
-      };
+      );
       client.init(args);
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
@@ -1119,9 +1137,7 @@ public class TestDistributedShell {
 
     LOG.info("Initializing DS Client with invalid no. of containers");
     try {
-      String[] args = {
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "-1",
           "--shell_command",
@@ -1130,7 +1146,7 @@ public class TestDistributedShell {
           "512",
           "--container_memory",
           "128"
-      };
+      );
       client.init(args);
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
@@ -1140,9 +1156,7 @@ public class TestDistributedShell {
     
     LOG.info("Initializing DS Client with invalid no. of vcores");
     try {
-      String[] args = {
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "2",
           "--shell_command",
@@ -1155,7 +1169,7 @@ public class TestDistributedShell {
           "128",
           "--container_vcores",
           "1"
-      };
+      );
       client.init(args);
       client.run();
       Assert.fail("Exception is expected");
@@ -1166,9 +1180,7 @@ public class TestDistributedShell {
 
     LOG.info("Initializing DS Client with --shell_command and --shell_script");
     try {
-      String[] args = {
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "2",
           "--shell_command",
@@ -1183,7 +1195,7 @@ public class TestDistributedShell {
           "1",
           "--shell_script",
           "test.sh"
-      };
+      );
       client.init(args);
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
@@ -1194,9 +1206,7 @@ public class TestDistributedShell {
 
     LOG.info("Initializing DS Client without --shell_command and --shell_script");
     try {
-      String[] args = {
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "2",
           "--master_memory",
@@ -1207,7 +1217,7 @@ public class TestDistributedShell {
           "128",
           "--container_vcores",
           "1"
-      };
+      );
       client.init(args);
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
@@ -1218,9 +1228,7 @@ public class TestDistributedShell {
 
     LOG.info("Initializing DS Client with invalid container_type argument");
     try {
-      String[] args = {
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArgsWithPostFix(appNameCounter++,
           "--num_containers",
           "2",
           "--master_memory",
@@ -1235,13 +1243,46 @@ public class TestDistributedShell {
           "date",
           "--container_type",
           "UNSUPPORTED_TYPE"
-      };
+      );
       client.init(args);
       Assert.fail("Exception is expected");
     } catch (IllegalArgumentException e) {
       Assert.assertTrue("The throw exception is not expected",
           e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE"));
     }
+
+    try {
+      String[] args = createArgsWithPostFix(appNameCounter++,
+          "--num_containers",
+          "1",
+          "--shell_command",
+          Shell.WINDOWS ? "dir" : "ls",
+          "--master_resources",
+          "memory-mb=invalid"
+      );
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (IllegalArgumentException e) {
+      // do nothing
+      LOG.info("IllegalArgumentException exception is expected: {}",
+          e.getMessage());
+    }
+
+    try {
+      String[] args = createArgsWithPostFix(appNameCounter++,
+          "--num_containers",
+          "1",
+          "--shell_command",
+          Shell.WINDOWS ? "dir" : "ls",
+          "--master_resources"
+      );
+      client.init(args);
+      Assert.fail("Exception is expected");
+    } catch (MissingArgumentException e) {
+      // do nothing
+      LOG.info("MissingArgumentException exception is expected: {}",
+          e.getMessage());
+    }
   }
 
   @Test
@@ -1276,54 +1317,45 @@ public class TestDistributedShell {
   }
 
   protected void waitForNMsToRegister() throws Exception {
-    int sec = 60;
-    while (sec >= 0) {
-      if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size() 
-          >= NUM_NMS) {
-        break;
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+        return (rmContext.getRMNodes().size() >= NUM_NMS);
       }
-      Thread.sleep(1000);
-      sec--;
-    }
+    }, 100, 60000);
   }
 
   @Test
   public void testContainerLaunchFailureHandling() throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-      "--jar",
-      APPMASTER_JAR,
-      "--num_containers",
-      "2",
-      "--shell_command",
-      Shell.WINDOWS ? "dir" : "ls",
-      "--master_memory",
-      "512",
-      "--container_memory",
-      "128"
-    };
+    String[] args = createArguments(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128"
+    );
 
     LOG.info("Initializing DS Client");
     Client client = new Client(ContainerLaunchFailAppMaster.class.getName(),
-      new Configuration(yarnCluster.getConfig()));
+        new Configuration(yarnCluster.getConfig()));
     boolean initSuccess = client.init(args);
     Assert.assertTrue(initSuccess);
     LOG.info("Running DS Client");
-    boolean result = client.run();
-
-    LOG.info("Client run completed. Result=" + result);
-    Assert.assertFalse(result);
-
+    try {
+      boolean result = client.run();
+      Assert.assertFalse(result);
+    } finally {
+      client.sendStopSignal();
+    }
   }
 
   @Test
   public void testDebugFlag() throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "2",
         "--shell_command",
@@ -1337,7 +1369,7 @@ public class TestDistributedShell {
         "--container_vcores",
         "1",
         "--debug"
-    };
+    );
 
     LOG.info("Initializing DS Client");
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
@@ -1370,7 +1402,7 @@ public class TestDistributedShell {
       for (File output : containerFiles[i].listFiles()) {
         if (output.getName().trim().contains("stdout")) {
           BufferedReader br = null;
-          List<String> stdOutContent = new ArrayList<String>();
+          List<String> stdOutContent = new ArrayList<>();
           try {
 
             String sCurrentLine;
@@ -1402,13 +1434,13 @@ public class TestDistributedShell {
               Assert.assertTrue(stdOutContent.containsAll(expectedContent));
             }
           } catch (IOException e) {
-            e.printStackTrace();
+            LOG.error("Exception reading the buffer", e);
           } finally {
             try {
               if (br != null)
                 br.close();
             } catch (IOException ex) {
-              ex.printStackTrace();
+              LOG.error("Exception closing the bufferReader", ex);
             }
           }
         }
@@ -1419,21 +1451,21 @@ public class TestDistributedShell {
 
   @Test
   public void testDistributedShellResourceProfiles() throws Exception {
-    String appName = generateAppName();
+    int appNameCounter = 0;
     String[][] args = {
-        {"--appname", appName + "-0", "--jar", APPMASTER_JAR,
+        createArgsWithPostFix(appNameCounter++,
             "--num_containers", "1", "--shell_command",
             Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
-            "maximum" },
-        {"--appname", appName + "-1", "--jar", APPMASTER_JAR,
+            "maximum"),
+        createArgsWithPostFix(appNameCounter++,
             "--num_containers", "1", "--shell_command",
             Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
-            "default" },
-        {"--appname", appName + "-2", "--jar", APPMASTER_JAR,
+            "default"),
+        createArgsWithPostFix(appNameCounter++,
             "--num_containers", "1", "--shell_command",
             Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
-            "default", "--container_resource_profile", "maximum" }
-        };
+            "default", "--container_resource_profile", "maximum"),
+    };
 
     for (int i = 0; i < args.length; ++i) {
       LOG.info("Initializing DS Client");
@@ -1453,11 +1485,7 @@ public class TestDistributedShell {
   public void testDSShellWithOpportunisticContainers() throws Exception {
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     try {
-      String[] args = {
-          "--appname",
-          generateAppName(),
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArguments(
           "--num_containers",
           "2",
           "--master_memory",
@@ -1472,24 +1500,24 @@ public class TestDistributedShell {
           "date",
           "--container_type",
           "OPPORTUNISTIC"
-      };
+      );
       client.init(args);
-      client.run();
+      assertTrue(client.run());
     } catch (Exception e) {
-      Assert.fail("Job execution with opportunistic containers failed.");
+      LOG.error("Job execution with opportunistic containers failed.", e);
+      Assert.fail("Exception. " + e.getMessage());
+    } finally {
+      client.sendStopSignal();
     }
   }
 
   @Test
   @TimelineVersion(2.0f)
   public void testDSShellWithEnforceExecutionType() throws Exception {
+    YarnClient yarnClient = null;
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     try {
-      String[] args = {
-          "--appname",
-          generateAppName(),
-          "--jar",
-          APPMASTER_JAR,
+      String[] args = createArguments(
           "--num_containers",
           "2",
           "--master_memory",
@@ -1505,7 +1533,7 @@ public class TestDistributedShell {
           "--container_type",
           "OPPORTUNISTIC",
           "--enforce_execution_type"
-      };
+      );
       client.init(args);
       final AtomicBoolean result = new AtomicBoolean(false);
       Thread t = new Thread() {
@@ -1519,7 +1547,7 @@ public class TestDistributedShell {
       };
       t.start();
 
-      YarnClient yarnClient = YarnClient.createYarnClient();
+      yarnClient = YarnClient.createYarnClient();
       yarnClient.init(new Configuration(yarnCluster.getConfig()));
       yarnClient.start();
       waitForContainersLaunch(yarnClient, 2);
@@ -1543,7 +1571,13 @@ public class TestDistributedShell {
         }
       }
     } catch (Exception e) {
-      Assert.fail("Job execution with enforce execution type failed.");
+      LOG.error("Job execution with enforce execution type failed.", e);
+      Assert.fail("Exception. " + e.getMessage());
+    } finally {
+      client.sendStopSignal();
+      if (yarnClient != null) {
+        yarnClient.stop();
+      }
     }
   }
 
@@ -1592,26 +1626,22 @@ public class TestDistributedShell {
         .getResourceScheduler().getClusterResource();
     String masterMemoryString = "1 Gi";
     String containerMemoryString = "512 Mi";
-    long masterMemory = 1024;
-    long containerMemory = 512;
+    long[] memVars = {1024, 512};
+
     Assume.assumeTrue("The cluster doesn't have enough memory for this test",
-        clusterResource.getMemorySize() >= masterMemory + containerMemory);
+        clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
     Assume.assumeTrue("The cluster doesn't have enough cores for this test",
         clusterResource.getVirtualCores() >= 2);
     if (largeContainers) {
-      masterMemory = clusterResource.getMemorySize() * 2 / 3;
-      masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB;
-      masterMemoryString = masterMemory + "Mi";
-      containerMemory = clusterResource.getMemorySize() / 3;
-      containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB;
-      containerMemoryString = String.valueOf(containerMemory);
+      memVars[0] = clusterResource.getMemorySize() * 2 / 3;
+      memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
+      masterMemoryString = memVars[0] + "Mi";
+      memVars[1] = clusterResource.getMemorySize() / 3;
+      memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
+      containerMemoryString = String.valueOf(memVars[1]);
     }
 
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "2",
         "--shell_command",
@@ -1619,8 +1649,8 @@ public class TestDistributedShell {
         "--master_resources",
         "memory=" + masterMemoryString + ",vcores=1",
         "--container_resources",
-        "memory=" + containerMemoryString + ",vcores=1",
-    };
+        "memory=" + containerMemoryString + ",vcores=1"
+    );
 
     LOG.info("Initializing DS Client");
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
@@ -1642,103 +1672,81 @@ public class TestDistributedShell {
     yarnClient.init(new Configuration(yarnCluster.getConfig()));
     yarnClient.start();
 
-    while (true) {
-      List<ApplicationReport> apps = yarnClient.getApplications();
-      if (apps.isEmpty()) {
-        Thread.sleep(10);
-        continue;
-      }
-      ApplicationReport appReport = apps.get(0);
-      ApplicationId appId = appReport.getApplicationId();
-      List<ApplicationAttemptReport> appAttempts =
-          yarnClient.getApplicationAttempts(appId);
-      if (appAttempts.isEmpty()) {
-        Thread.sleep(10);
-        continue;
-      }
-      ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
-      ContainerId amContainerId = appAttemptReport.getAMContainerId();
-
-      if (amContainerId == null) {
-        Thread.sleep(10);
-        continue;
-      }
-      ContainerReport report = yarnClient.getContainerReport(amContainerId);
-      Resource masterResource = report.getAllocatedResource();
-      Assert.assertEquals(masterMemory, masterResource.getMemorySize());
-      Assert.assertEquals(1, masterResource.getVirtualCores());
-
-      List<ContainerReport> containers =
-          yarnClient.getContainers(appAttemptReport.getApplicationAttemptId());
-      if (containers.size() < 2) {
-        Thread.sleep(10);
-        continue;
-      }
-      for (ContainerReport container : containers) {
-        if (!container.getContainerId().equals(amContainerId)) {
-          Resource containerResource = container.getAllocatedResource();
-          Assert.assertEquals(containerMemory,
-              containerResource.getMemorySize());
-          Assert.assertEquals(1, containerResource.getVirtualCores());
+    final AtomicBoolean testFailed = new AtomicBoolean(false);
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          if (testFailed.get()) {
+            return true;
+          }
+          List<ContainerReport> containers;
+          try {
+            List<ApplicationReport> apps = yarnClient.getApplications();
+            if (apps.isEmpty()) {
+              return false;
+            }
+            ApplicationReport appReport = apps.get(0);
+            ApplicationId appId = appReport.getApplicationId();
+            List<ApplicationAttemptReport> appAttempts =
+                yarnClient.getApplicationAttempts(appId);
+            if (appAttempts.isEmpty()) {
+              return false;
+            }
+            ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
+            ContainerId amContainerId = appAttemptReport.getAMContainerId();
+            if (amContainerId == null) {
+              return false;
+            }
+            ContainerReport report = yarnClient.getContainerReport(
+                amContainerId);
+            Resource masterResource = report.getAllocatedResource();
+            Assert.assertEquals(memVars[0],
+                masterResource.getMemorySize());
+            Assert.assertEquals(1, masterResource.getVirtualCores());
+            containers = yarnClient.getContainers(
+                appAttemptReport.getApplicationAttemptId());
+            if (containers.size() < 2) {
+              return false;
+            }
+            for (ContainerReport container : containers) {
+              if (!container.getContainerId().equals(amContainerId)) {
+                Resource containerResource = container.getAllocatedResource();
+                Assert.assertEquals(memVars[1],
+                    containerResource.getMemorySize());
+                Assert.assertEquals(1, containerResource.getVirtualCores());
+              }
+            }
+            return true;
+          } catch (Exception ex) {
+            LOG.error("Error waiting for expected results", ex);
+            testFailed.set(true);
+          }
+          return false;
         }
+      }, 10, TEST_TIME_WINDOW_EXPIRE);
+      assertFalse(testFailed.get());
+    } finally {
+      LOG.info("Signaling Client to Stop");
+      client.sendStopSignal();
+      if (yarnClient != null) {
+        LOG.info("Stopping yarnClient service");
+        yarnClient.stop();
       }
-
-      return;
     }
   }
 
-  @Test(expected=IllegalArgumentException.class)
-  public void testDistributedShellAMResourcesWithIllegalArguments()
-      throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_resources",
-        "memory-mb=invalid"
-    };
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-  }
-
-  @Test(expected=MissingArgumentException.class)
-  public void testDistributedShellAMResourcesWithMissingArgumentValue()
-      throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_resources"
-    };
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-  }
-
   @Test(expected=ResourceNotFoundException.class)
   public void testDistributedShellAMResourcesWithUnknownResource()
       throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
         Shell.WINDOWS ? "dir" : "ls",
         "--master_resources",
         "unknown-resource=5"
-    };
+    );
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     client.init(args);
     client.run();
@@ -1747,18 +1755,14 @@ public class TestDistributedShell {
   @Test(expected=IllegalArgumentException.class)
   public void testDistributedShellNonExistentQueue()
       throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
         Shell.WINDOWS ? "dir" : "ls",
         "--queue",
         "non-existent-queue"
-    };
+    );
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     client.init(args);
     client.run();
@@ -1767,11 +1771,7 @@ public class TestDistributedShell {
   @Test
   public void testDistributedShellWithSingleFileLocalization()
       throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
@@ -1780,7 +1780,7 @@ public class TestDistributedShell {
         "./src/test/resources/a.txt",
         "--shell_args",
         "a.txt"
-    };
+    );
 
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     client.init(args);
@@ -1790,11 +1790,7 @@ public class TestDistributedShell {
   @Test
   public void testDistributedShellWithMultiFileLocalization()
       throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
@@ -1803,7 +1799,7 @@ public class TestDistributedShell {
         "./src/test/resources/a.txt,./src/test/resources/b.txt",
         "--shell_args",
         "a.txt b.txt"
-    };
+    );
 
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     client.init(args);
@@ -1813,11 +1809,7 @@ public class TestDistributedShell {
   @Test(expected=UncheckedIOException.class)
   public void testDistributedShellWithNonExistentFileLocalization()
       throws Exception {
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
@@ -1826,11 +1818,11 @@ public class TestDistributedShell {
         "/non/existing/path/file.txt",
         "--shell_args",
         "file.txt"
-    };
+    );
 
     Client client = new Client(new Configuration(yarnCluster.getConfig()));
     client.init(args);
-    client.run();
+    assertTrue(client.run());
   }
 
 
@@ -1838,34 +1830,34 @@ public class TestDistributedShell {
   public void testDistributedShellCleanup()
       throws Exception {
     String appName = "DistributedShellCleanup";
-    String[] args = {
-        "--appname",
-        generateAppName(),
-        "--jar",
-        APPMASTER_JAR,
+    String[] args = createArguments(
         "--num_containers",
         "1",
         "--shell_command",
         Shell.WINDOWS ? "dir" : "ls"
-    };
+    );
     Configuration config = new Configuration(yarnCluster.getConfig());
     Client client = new Client(config);
-    client.init(args);
-    client.run();
-    ApplicationId appId = client.getAppId();
-    String relativePath =
-        ApplicationMaster.getRelativePath(appName, appId.toString(), "");
-    FileSystem fs1 = FileSystem.get(config);
-    Path path = new Path(fs1.getHomeDirectory(), relativePath);
+    try {
+      client.init(args);
+      client.run();
+      ApplicationId appId = client.getAppId();
+      String relativePath =
+          ApplicationMaster.getRelativePath(appName, appId.toString(), "");
+      FileSystem fs1 = FileSystem.get(config);
+      Path path = new Path(fs1.getHomeDirectory(), relativePath);
 
-    GenericTestUtils.waitFor(() -> {
-      try {
-        return !fs1.exists(path);
-      } catch (IOException e) {
-        return false;
-      }
-    }, 10, 60000);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return !fs1.exists(path);
+        } catch (IOException e) {
+          return false;
+        }
+      }, 10, 60000);
 
-    assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
+      assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
+    } finally {
+      client.sendStopSignal();
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org