You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by iw...@apache.org on 2021/01/08 23:39:37 UTC

[hadoop] branch trunk updated: YARN-10553. Refactor TestDistributedShell (#2581)

This is an automated email from the ASF dual-hosted git repository.

iwasakims pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 890f2da  YARN-10553. Refactor TestDistributedShell (#2581)
890f2da is described below

commit 890f2da624465473a5f401a3bcfc4bbd068289a1
Author: Ahmed Hussein <50...@users.noreply.github.com>
AuthorDate: Fri Jan 8 17:39:21 2021 -0600

    YARN-10553. Refactor TestDistributedShell (#2581)
---
 .../distributedshell/ApplicationMaster.java        |    2 +-
 .../yarn/applications/distributedshell/Client.java |    8 +-
 .../distributedshell/DistributedShellBaseTest.java |  607 +++++++
 .../distributedshell/TestDSSleepingAppMaster.java  |    9 +-
 .../distributedshell/TestDSTimelineV10.java        |  843 +++++++++
 .../distributedshell/TestDSTimelineV15.java        |  100 ++
 .../distributedshell/TestDSTimelineV20.java        |  484 +++++
 .../TestDSWithMultipleNodeManager.java             |  599 ++++---
 .../distributedshell/TestDistributedShell.java     | 1865 --------------------
 9 files changed, 2388 insertions(+), 2129 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index ae14d09..765ca82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -781,7 +781,7 @@ public class ApplicationMaster {
     new HelpFormatter().printHelp("ApplicationMaster", opts);
   }
 
-  private void cleanup() {
+  protected void cleanup() {
     try {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 5da4384..b271486 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -1414,21 +1414,19 @@ public class Client {
     }
     int waitCount = 0;
     LOG.info("Waiting for Client to exit loop");
-    while (!isRunning.get()) {
+    while (isRunning.get()) {
       try {
         Thread.sleep(50);
       } catch (InterruptedException ie) {
         // do nothing
       } finally {
-        waitCount++;
-        if (isRunning.get() || waitCount > 2000) {
+        if (++waitCount > 2000) {
           break;
         }
       }
     }
-    LOG.info("Stopping yarnClient within the Client");
+    LOG.info("Stopping yarnClient within the DS Client");
     yarnClient.stop();
-    yarnClient.waitForServiceToStop(clientTimeout);
     LOG.info("done stopping Client");
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
new file mode 100644
index 0000000..28cdf8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+/**
+ * Base class for testing DistributedShell features.
+ */
+public abstract class DistributedShellBaseTest {
+  protected static final int MIN_ALLOCATION_MB = 128;
+  protected static final int NUM_DATA_NODES = 1;
+  protected static final int TEST_TIME_OUT = 160000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  protected static final int TEST_TIME_WINDOW_EXPIRE =
+      (TEST_TIME_OUT * 90) / 100;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedShellBaseTest.class);
+  private static final String APP_MASTER_JAR =
+      JarFinder.getJar(ApplicationMaster.class);
+  private static final int NUM_NMS = 1;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private static final String YARN_CLIENT_TIMEOUT =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+  private static final String[] COMMON_ARGS = {
+      "--jar",
+      APP_MASTER_JAR,
+      "--timeout",
+      YARN_CLIENT_TIMEOUT,
+      "--appname",
+      ""
+  };
+  private static MiniDFSCluster hdfsCluster = null;
+  private static MiniYARNCluster yarnCluster = null;
+  private static String yarnSiteBackupPath = null;
+  private static String yarnSitePath = null;
+  @Rule
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public TestName name = new TestName();
+  private Client dsClient;
+  private YarnConfiguration conf = null;
+  // location of the filesystem timeline writer for timeline service v.2
+  private String timelineV2StorageDir = null;
+
+  @BeforeClass
+  public static void setupUnitTests() throws Exception {
+    URL url = Thread.currentThread().getContextClassLoader().getResource(
+        "yarn-site.xml");
+    if (url == null) {
+      throw new RuntimeException(
+          "Could not find 'yarn-site.xml' dummy file in classpath");
+    }
+    // backup the original yarn-site file.
+    yarnSitePath = url.getPath();
+    yarnSiteBackupPath = url.getPath() + "-backup";
+    Files.copy(Paths.get(yarnSitePath),
+        Paths.get(yarnSiteBackupPath),
+        StandardCopyOption.COPY_ATTRIBUTES,
+        StandardCopyOption.REPLACE_EXISTING);
+  }
+
+  @AfterClass
+  public static void tearDownUnitTests() throws Exception {
+    // shutdown the clusters.
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+    if (yarnSitePath == null || yarnSiteBackupPath == null) {
+      return;
+    }
+    // restore the original yarn-site file.
+    if (Files.exists(Paths.get(yarnSiteBackupPath))) {
+      Files.move(Paths.get(yarnSiteBackupPath), Paths.get(yarnSitePath),
+          StandardCopyOption.REPLACE_EXISTING);
+    }
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our arguments.
+   *
+   * @param args the first set of the arguments.
+   * @param newArgs the second set of the arguments.
+   * @return a String array consists of {args, newArgs}
+   */
+  protected static String[] mergeArgs(String[] args, String[] newArgs) {
+    int length = args.length + newArgs.length;
+    String[] result = new String[length];
+    System.arraycopy(args, 0, result, 0, args.length);
+    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+    return result;
+  }
+
+  protected static String[] createArguments(Supplier<String> testNameProvider,
+      String... args) {
+    String[] res = mergeArgs(COMMON_ARGS, args);
+    // set the application name so we can track down which command is running.
+    res[COMMON_ARGS.length - 1] = testNameProvider.get();
+    return res;
+  }
+
+  protected static String getSleepCommand(int sec) {
+    // Windows doesn't have a sleep command, ping -n does the trick
+    return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul"
+        : "sleep " + sec;
+  }
+
+  protected static String getListCommand() {
+    return Shell.WINDOWS ? "dir" : "ls";
+  }
+
+  protected static String getCatCommand() {
+    return Shell.WINDOWS ? "type" : "cat";
+  }
+
+  protected static void shutdownYarnCluster() {
+    if (yarnCluster != null) {
+      try {
+        yarnCluster.stop();
+      } finally {
+        yarnCluster = null;
+      }
+    }
+  }
+
+  protected static void shutdownHdfsCluster() {
+    if (hdfsCluster != null) {
+      try {
+        hdfsCluster.shutdown();
+      } finally {
+        hdfsCluster = null;
+      }
+    }
+  }
+
+  public String getTimelineV2StorageDir() {
+    return timelineV2StorageDir;
+  }
+
+  public void setTimelineV2StorageDir() throws Exception {
+    timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupInternal(NUM_NMS, new YarnConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    cleanUpDFSClient();
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+  }
+
+  protected String[] createArgumentsWithAppName(String... args) {
+    return createArguments(() -> generateAppName(), args);
+  }
+
+  protected void waitForContainersLaunch(YarnClient client, int nContainers,
+      AtomicReference<ApplicationAttemptReport> appAttemptReportRef,
+      AtomicReference<List<ContainerReport>> containersListRef,
+      AtomicReference<ApplicationAttemptId> appAttemptIdRef,
+      AtomicReference<Throwable> thrownErrorRef) throws Exception {
+    GenericTestUtils.waitFor(() -> {
+      try {
+        List<ApplicationReport> apps = client.getApplications();
+        if (apps == null || apps.isEmpty()) {
+          return false;
+        }
+        ApplicationId appId = apps.get(0).getApplicationId();
+        List<ApplicationAttemptReport> appAttempts =
+            client.getApplicationAttempts(appId);
+        if (appAttempts == null || appAttempts.isEmpty()) {
+          return false;
+        }
+        ApplicationAttemptId attemptId =
+            appAttempts.get(0).getApplicationAttemptId();
+        List<ContainerReport> containers = client.getContainers(attemptId);
+        if (containers == null || containers.size() < nContainers) {
+          return false;
+        }
+        containersListRef.set(containers);
+        appAttemptIdRef.set(attemptId);
+        appAttemptReportRef.set(appAttempts.get(0));
+      } catch (Exception e) {
+        LOG.error("Exception waiting for Containers Launch", e);
+        thrownErrorRef.set(e);
+      }
+      return true;
+    }, 10, TEST_TIME_WINDOW_EXPIRE);
+  }
+
+  protected abstract void customizeConfiguration(YarnConfiguration config)
+      throws Exception;
+
+  protected String[] appendFlowArgsForTestDSShell(String[] args,
+      boolean defaultFlow) {
+    return args;
+  }
+
+  protected String[] appendDomainArgsForTestDSShell(String[] args,
+      boolean haveDomain) {
+    String[] result = args;
+    if (haveDomain) {
+      String[] domainArgs = {
+          "--domain",
+          "TEST_DOMAIN",
+          "--view_acls",
+          "reader_user reader_group",
+          "--modify_acls",
+          "writer_user writer_group",
+          "--create"
+      };
+      result = mergeArgs(args, domainArgs);
+    }
+    return result;
+  }
+
+  protected Client setAndGetDSClient(Configuration config) throws Exception {
+    dsClient = new Client(config);
+    return dsClient;
+  }
+
+  protected Client setAndGetDSClient(String appMasterMainClass,
+      Configuration config) throws Exception {
+    dsClient = new Client(appMasterMainClass, config);
+    return dsClient;
+  }
+
+  protected void baseTestDSShell(boolean haveDomain, boolean defaultFlow)
+      throws Exception {
+    String[] baseArgs = createArgumentsWithAppName(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1");
+    String[] domainArgs = appendDomainArgsForTestDSShell(baseArgs, haveDomain);
+    String[] args = appendFlowArgsForTestDSShell(domainArgs, defaultFlow);
+
+    LOG.info("Initializing DS Client");
+    YarnClient yarnClient;
+    dsClient = setAndGetDSClient(new Configuration(yarnCluster.getConfig()));
+    boolean initSuccess = dsClient.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running DS Client");
+    final AtomicBoolean result = new AtomicBoolean(false);
+    Thread t = new Thread(() -> {
+      try {
+        result.set(dsClient.run());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    t.start();
+
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(new Configuration(yarnCluster.getConfig()));
+    yarnClient.start();
+
+    AtomicInteger waitResult = new AtomicInteger(0);
+    AtomicReference<ApplicationId> appIdRef =
+        new AtomicReference<>(null);
+    AtomicReference<ApplicationReport> appReportRef =
+        new AtomicReference<>(null);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        List<ApplicationReport> apps = yarnClient.getApplications();
+        if (apps.size() == 0) {
+          return false;
+        }
+        ApplicationReport appReport = apps.get(0);
+        appReportRef.set(appReport);
+        appIdRef.set(appReport.getApplicationId());
+        if (appReport.getHost().equals("N/A")) {
+          return false;
+        }
+        if (appReport.getRpcPort() == -1) {
+          waitResult.set(1);
+        }
+        if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
+            && appReport.getFinalApplicationStatus() !=
+            FinalApplicationStatus.UNDEFINED) {
+          return true;
+        }
+      } catch (Exception e) {
+        LOG.error("Exception get application from Yarn Client", e);
+        waitResult.set(2);
+      }
+      return waitResult.get() != 0;
+    }, 10, TEST_TIME_WINDOW_EXPIRE);
+    t.join();
+    if (waitResult.get() == 2) {
+      // Exception was raised
+      Assert.fail("Exception in getting application report. Failed");
+    }
+    if (waitResult.get() == 1) {
+      Assert.assertEquals("Failed waiting for expected rpc port to be -1.",
+          -1, appReportRef.get().getRpcPort());
+    }
+    checkTimeline(appIdRef.get(), defaultFlow, haveDomain, appReportRef.get());
+  }
+
+  protected void baseTestDSShell(boolean haveDomain) throws Exception {
+    baseTestDSShell(haveDomain, true);
+  }
+
+  protected void checkTimeline(ApplicationId appId,
+      boolean defaultFlow, boolean haveDomain,
+      ApplicationReport appReport) throws Exception {
+    TimelineDomain domain = null;
+    if (haveDomain) {
+      domain = yarnCluster.getApplicationHistoryServer()
+          .getTimelineStore().getDomain("TEST_DOMAIN");
+      Assert.assertNotNull(domain);
+      Assert.assertEquals("reader_user reader_group", domain.getReaders());
+      Assert.assertEquals("writer_user writer_group", domain.getWriters());
+    }
+    TimelineEntities entitiesAttempts = yarnCluster
+        .getApplicationHistoryServer()
+        .getTimelineStore()
+        .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
+            null, null, null, null, null, null, null, null, null);
+    Assert.assertNotNull(entitiesAttempts);
+    Assert.assertEquals(1, entitiesAttempts.getEntities().size());
+    Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
+        .size());
+    Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
+        ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
+    Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT",
+        entitiesAttempts.getEntities().get(0).getDomainId());
+    String currAttemptEntityId =
+        entitiesAttempts.getEntities().get(0).getEntityId();
+    ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(
+        currAttemptEntityId);
+    NameValuePair primaryFilter = new NameValuePair(
+        ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
+        attemptId.getApplicationId().toString());
+    TimelineEntities entities = yarnCluster
+        .getApplicationHistoryServer()
+        .getTimelineStore()
+        .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
+            null, null, null, null, primaryFilter, null, null, null);
+    Assert.assertNotNull(entities);
+    Assert.assertEquals(2, entities.getEntities().size());
+    Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
+        ApplicationMaster.DSEntity.DS_CONTAINER.toString());
+
+    String entityId = entities.getEntities().get(0).getEntityId();
+    TimelineEntity entity =
+        yarnCluster.getApplicationHistoryServer().getTimelineStore()
+            .getEntity(entityId,
+                ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null);
+    Assert.assertNotNull(entity);
+    Assert.assertEquals(entityId, entity.getEntityId());
+    Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT",
+        entities.getEntities().get(0).getDomainId());
+  }
+
+  protected String[] createArgsWithPostFix(int index, String... args) {
+    String[] res = mergeArgs(COMMON_ARGS, args);
+    // set the application name so we can track down which command is running.
+    res[COMMON_ARGS.length - 1] = generateAppName(String.format("%03d",
+        index));
+    return res;
+  }
+
+  protected String generateAppName() {
+    return generateAppName(null);
+  }
+
+  protected String generateAppName(String postFix) {
+    return name.getMethodName().replaceFirst("test", "")
+        .concat(postFix == null ? "" : "-" + postFix);
+  }
+
+  protected void setUpHDFSCluster() throws IOException {
+    if (hdfsCluster == null) {
+      HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+      hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+          .numDataNodes(NUM_DATA_NODES).build();
+      hdfsCluster.waitActive();
+    }
+  }
+
+  protected void setUpYarnCluster(int numNodeManagers,
+      YarnConfiguration yarnConfig) throws Exception {
+    if (yarnCluster != null) {
+      return;
+    }
+    yarnCluster =
+        new MiniYARNCluster(getClass().getSimpleName(), 1, numNodeManagers,
+            1, 1);
+    yarnCluster.init(yarnConfig);
+    yarnCluster.start();
+    // wait for the node managers to register.
+    waitForNMsToRegister();
+    conf.set(
+        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+        MiniYARNCluster.getHostname() + ":"
+            + yarnCluster.getApplicationHistoryServer().getPort());
+    Configuration yarnClusterConfig = yarnCluster.getConfig();
+    yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+        new File(yarnSitePath).getParent());
+    // write the document to a buffer (not directly to the file, as that
+    // can cause the file being written to get read -which will then fail.
+    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+    yarnClusterConfig.writeXml(bytesOut);
+    bytesOut.close();
+    // write the bytes to the file in the classpath
+    OutputStream os = new FileOutputStream(yarnSitePath);
+    os.write(bytesOut.toByteArray());
+    os.close();
+  }
+
+  protected void setupInternal(int numNodeManagers,
+      YarnConfiguration yarnConfig) throws Exception {
+    LOG.info("========== Setting UP UnitTest {}#{} ==========",
+        getClass().getCanonicalName(), name.getMethodName());
+    LOG.info("Starting up YARN cluster. Timeline version {}",
+        getTimelineVersion());
+    conf = yarnConfig;
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        MIN_ALLOCATION_MB);
+    // reduce the tearDown waiting time
+    conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
+    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
+    conf.set("yarn.log.dir", "target");
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    // mark if we need to launch the v1 timeline server
+    // disable aux-service based timeline aggregators
+    conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+
+    conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    conf.set("mapreduce.jobhistory.address",
+        "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
+    // Enable ContainersMonitorImpl
+    conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+        LinuxResourceCalculatorPlugin.class.getName());
+    conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+        ProcfsBasedProcessTree.class.getName());
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+    conf.setBoolean(
+        YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
+    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+        true);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+        10);
+    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+    // ATS version specific settings
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+        getTimelineVersion());
+    // setup the configuration of relevant for each TimelineService version.
+    customizeConfiguration(conf);
+    // setup the yarn cluster.
+    setUpYarnCluster(numNodeManagers, conf);
+  }
+
+  protected NodeManager getNodeManager(int index) {
+    return yarnCluster.getNodeManager(index);
+  }
+
+  protected MiniYARNCluster getYarnCluster() {
+    return yarnCluster;
+  }
+
+  protected void setConfiguration(String key, String value) {
+    conf.set(key, value);
+  }
+
+  protected Configuration getYarnClusterConfiguration() {
+    return yarnCluster.getConfig();
+  }
+
+  protected Configuration getConfiguration() {
+    return conf;
+  }
+
+  protected ResourceManager getResourceManager() {
+    return yarnCluster.getResourceManager();
+  }
+
+  protected ResourceManager getResourceManager(int index) {
+    return yarnCluster.getResourceManager(index);
+  }
+
+  protected Client getDSClient() {
+    return dsClient;
+  }
+
+  protected void resetDSClient() {
+    dsClient = null;
+  }
+
+  protected abstract float getTimelineVersion();
+
+  protected void cleanUpDFSClient() {
+    if (getDSClient() != null) {
+      getDSClient().sendStopSignal();
+      resetDSClient();
+    }
+  }
+
+  private void waitForNMsToRegister() throws Exception {
+    GenericTestUtils.waitFor(() -> {
+      RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+      return (rmContext.getRMNodes().size() >= NUM_NMS);
+    }, 100, 60000);
+  }
+
+  protected MiniDFSCluster getHDFSCluster() {
+    return hdfsCluster;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java
index 25975bf..ae25ece 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java
@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.yarn.applications.distributedshell;
 
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestDSSleepingAppMaster extends ApplicationMaster{
+public class TestDSSleepingAppMaster extends ApplicationMaster {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(TestDSSleepingAppMaster.class);
@@ -30,8 +29,8 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{
 
   public static void main(String[] args) {
     boolean result = false;
+    TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
     try {
-      TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
       boolean doRun = appMaster.init(args);
       if (!doRun) {
         System.exit(0);
@@ -48,6 +47,10 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{
       result = appMaster.finish();
     } catch (Throwable t) {
       System.exit(1);
+    } finally {
+      if (appMaster != null) {
+        appMaster.cleanup();
+      }
     }
     if (result) {
       LOG.info("Application Master completed successfully. exiting");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java
new file mode 100644
index 0000000..15dc1cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java
@@ -0,0 +1,843 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
+import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests implementations for distributed shell on TimeLineV1.
+ */
+public class TestDSTimelineV10 extends DistributedShellBaseTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDSTimelineV10.class);
+
+  @Override
+  protected float getTimelineVersion() {
+    return 1.0f;
+  }
+
+  @Override
+  protected void cleanUpDFSClient() {
+
+  }
+
+  @Test
+  public void testDSShellWithDomain() throws Exception {
+    baseTestDSShell(true);
+  }
+
+  @Test
+  public void testDSShellWithoutDomain() throws Exception {
+    baseTestDSShell(false);
+  }
+
+  @Test
+  public void testDSRestartWithPreviousRunningContainers() throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getSleepCommand(8),
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128",
+        "--keep_containers_across_application_attempts"
+    );
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(TestDSFailedAppMaster.class.getName(),
+        new Configuration(getYarnClusterConfiguration()));
+
+    getDSClient().init(args);
+
+    LOG.info("Running DS Client");
+    boolean result = getDSClient().run();
+    LOG.info("Client run completed. Result={}", result);
+    // application should succeed
+    Assert.assertTrue(result);
+  }
+
+  /*
+   * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
+   * Set attempt_failures_validity_interval as 2.5 seconds. It will check
+   * how many attempt failures for previous 2.5 seconds.
+   * The application is expected to be successful.
+   */
+  @Test
+  public void testDSAttemptFailuresValidityIntervalSuccess() throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getSleepCommand(8),
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128",
+        "--attempt_failures_validity_interval",
+        "2500"
+    );
+
+    LOG.info("Initializing DS Client");
+    Configuration config = getYarnClusterConfiguration();
+    config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    setAndGetDSClient(TestDSSleepingAppMaster.class.getName(),
+        new Configuration(config));
+
+    getDSClient().init(args);
+
+    LOG.info("Running DS Client");
+    boolean result = getDSClient().run();
+
+    LOG.info("Client run completed. Result=" + result);
+    // application should succeed
+    Assert.assertTrue(result);
+  }
+
+  /*
+   * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
+   * Set attempt_failures_validity_interval as 15 seconds. It will check
+   * how many attempt failure for previous 15 seconds.
+   * The application is expected to be fail.
+   */
+  @Test
+  public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getSleepCommand(8),
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128",
+        "--attempt_failures_validity_interval",
+        "15000"
+    );
+
+    LOG.info("Initializing DS Client");
+    Configuration config = getYarnClusterConfiguration();
+    config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+    setAndGetDSClient(TestDSSleepingAppMaster.class.getName(),
+        new Configuration(config));
+
+    getDSClient().init(args);
+
+    LOG.info("Running DS Client");
+    boolean result = getDSClient().run();
+
+    LOG.info("Client run completed. Result=" + result);
+    // application should be failed
+    Assert.assertFalse(result);
+  }
+
+  @Test
+  public void testDSShellWithCustomLogPropertyFile() throws Exception {
+    final File basedir = getBaseDirForTest();
+    final File tmpDir = new File(basedir, "tmpDir");
+    tmpDir.mkdirs();
+    final File customLogProperty = new File(tmpDir, "custom_log4j.properties");
+    if (customLogProperty.exists()) {
+      customLogProperty.delete();
+    }
+    if (!customLogProperty.createNewFile()) {
+      Assert.fail("Can not create custom log4j property file.");
+    }
+    PrintWriter fileWriter = new PrintWriter(customLogProperty);
+    // set the output to DEBUG level
+    fileWriter.write("log4j.rootLogger=debug,stdout");
+    fileWriter.close();
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "3",
+        "--shell_command",
+        "echo",
+        "--shell_args",
+        "HADOOP",
+        "--log_properties",
+        customLogProperty.getAbsolutePath(),
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1"
+    );
+
+    // Before run the DS, the default the log level is INFO
+    final Logger LOG_Client =
+        LoggerFactory.getLogger(Client.class);
+    Assert.assertTrue(LOG_Client.isInfoEnabled());
+    Assert.assertFalse(LOG_Client.isDebugEnabled());
+    final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class);
+    Assert.assertTrue(LOG_AM.isInfoEnabled());
+    Assert.assertFalse(LOG_AM.isDebugEnabled());
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    boolean initSuccess = getDSClient().init(args);
+    Assert.assertTrue(initSuccess);
+
+    LOG.info("Running DS Client");
+    boolean result = getDSClient().run();
+    LOG.info("Client run completed. Result=" + result);
+    Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10);
+    //After DS is finished, the log level should be DEBUG
+    Assert.assertTrue(LOG_Client.isInfoEnabled());
+    Assert.assertTrue(LOG_Client.isDebugEnabled());
+    Assert.assertTrue(LOG_AM.isInfoEnabled());
+    Assert.assertTrue(LOG_AM.isDebugEnabled());
+  }
+
+  @Test
+  public void testSpecifyingLogAggregationContext() throws Exception {
+    String regex = ".*(foo|bar)\\d";
+    String[] args = createArgumentsWithAppName(
+        "--shell_command",
+        "echo",
+        "--rolling_log_pattern",
+        regex
+    );
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    Assert.assertTrue(getDSClient().init(args));
+
+    ApplicationSubmissionContext context =
+        Records.newRecord(ApplicationSubmissionContext.class);
+    getDSClient().specifyLogAggregationContext(context);
+    LogAggregationContext logContext = context.getLogAggregationContext();
+    assertEquals(logContext.getRolledLogsIncludePattern(), regex);
+    assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
+  }
+
+  @Test
+  public void testDSShellWithMultipleArgs() throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "4",
+        "--shell_command",
+        "echo",
+        "--shell_args",
+        "HADOOP YARN MAPREDUCE HDFS",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1"
+    );
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    boolean initSuccess = getDSClient().init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running DS Client");
+
+    boolean result = getDSClient().run();
+    LOG.info("Client run completed. Result=" + result);
+    List<String> expectedContent = new ArrayList<>();
+    expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
+    verifyContainerLog(4, expectedContent, false, "");
+  }
+
+  @Test
+  public void testDSShellWithShellScript() throws Exception {
+    final File basedir = getBaseDirForTest();
+    final File tmpDir = new File(basedir, "tmpDir");
+    tmpDir.mkdirs();
+    final File customShellScript = new File(tmpDir, "custom_script.sh");
+    if (customShellScript.exists()) {
+      customShellScript.delete();
+    }
+    if (!customShellScript.createNewFile()) {
+      Assert.fail("Can not create custom shell script file.");
+    }
+    PrintWriter fileWriter = new PrintWriter(customShellScript);
+    // set the output to DEBUG level
+    fileWriter.write("echo testDSShellWithShellScript");
+    fileWriter.close();
+    LOG.info(customShellScript.getAbsolutePath());
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_script",
+        customShellScript.getAbsolutePath(),
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1"
+    );
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    Assert.assertTrue(getDSClient().init(args));
+    LOG.info("Running DS Client");
+    assertTrue(getDSClient().run());
+    List<String> expectedContent = new ArrayList<>();
+    expectedContent.add("testDSShellWithShellScript");
+    verifyContainerLog(1, expectedContent, false, "");
+  }
+
+  @Test
+  public void testDSShellWithInvalidArgs() throws Exception {
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    int appNameCounter = 0;
+    LOG.info("Initializing DS Client with no args");
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "No args",
+        () -> getDSClient().init(new String[]{}));
+
+    LOG.info("Initializing DS Client with no jar file");
+    String[] noJarArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128"
+    );
+    String[] argsNoJar = Arrays.copyOfRange(noJarArgs, 2, noJarArgs.length);
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "No jar",
+        () -> getDSClient().init(argsNoJar));
+
+    LOG.info("Initializing DS Client with no shell command");
+    String[] noShellCmdArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "2",
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128"
+    );
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "No shell command",
+        () -> getDSClient().init(noShellCmdArgs));
+
+    LOG.info("Initializing DS Client with invalid no. of containers");
+
+    String[] numContainersArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "-1",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128"
+    );
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Invalid no. of containers",
+        () -> getDSClient().init(numContainersArgs));
+
+    LOG.info("Initializing DS Client with invalid no. of vcores");
+
+    String[] vCoresArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "-2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1"
+    );
+    getDSClient().init(vCoresArgs);
+
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Invalid virtual cores specified",
+        () -> {
+          getDSClient().init(vCoresArgs);
+          getDSClient().run();
+        });
+
+    LOG.info("Initializing DS Client with --shell_command and --shell_script");
+
+    String[] scriptAndCmdArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--shell_script",
+        "test.sh"
+    );
+
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Can not specify shell_command option and shell_script option at "
+            + "the same time",
+        () -> getDSClient().init(scriptAndCmdArgs));
+
+    LOG.info(
+        "Initializing DS Client without --shell_command and --shell_script");
+
+    String[] noShellCmdNoScriptArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "2",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1"
+    );
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "No shell command or shell script specified "
+            + "to be executed by application master",
+        () -> getDSClient().init(noShellCmdNoScriptArgs));
+
+    LOG.info("Initializing DS Client with invalid container_type argument");
+    String[] invalidTypeArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "2",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--shell_command",
+        "date",
+        "--container_type",
+        "UNSUPPORTED_TYPE"
+    );
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        "Invalid container_type: UNSUPPORTED_TYPE",
+        () -> getDSClient().init(invalidTypeArgs));
+
+    String[] invalidMemArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getListCommand(),
+        "--master_resources",
+        "memory-mb=invalid"
+    );
+    LambdaTestUtils.intercept(IllegalArgumentException.class,
+        () -> getDSClient().init(invalidMemArgs));
+
+    String[] invalidMasterResArgs = createArgsWithPostFix(appNameCounter++,
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getListCommand(),
+        "--master_resources"
+    );
+    LambdaTestUtils.intercept(MissingArgumentException.class,
+        () -> getDSClient().init(invalidMasterResArgs));
+  }
+
+  @Test
+  public void testDSTimelineClientWithConnectionRefuse() throws Exception {
+    ApplicationMaster am = new ApplicationMaster();
+    final AtomicReference<TimelineWriter> spyTimelineWriterRef =
+        new AtomicReference<>(null);
+    TimelineClientImpl client = new TimelineClientImpl() {
+      @Override
+      protected TimelineWriter createTimelineWriter(Configuration conf,
+          UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
+          URI resURI) throws IOException {
+        TimelineWriter timelineWriter =
+            new DirectTimelineWriter(authUgi, client, resURI);
+        spyTimelineWriterRef.set(spy(timelineWriter));
+        return spyTimelineWriterRef.get();
+      }
+    };
+    client.init(getConfiguration());
+    client.start();
+    TestTimelineClient.mockEntityClientResponse(spyTimelineWriterRef.get(),
+        null, false, true);
+    try {
+      UserGroupInformation ugi = mock(UserGroupInformation.class);
+      when(ugi.getShortUserName()).thenReturn("user1");
+      // verify no ClientHandlerException get thrown out.
+      am.publishContainerEndEvent(client, ContainerStatus.newInstance(
+          BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
+          1), "domainId", ugi);
+    } finally {
+      client.stop();
+    }
+  }
+
+  @Test
+  public void testContainerLaunchFailureHandling() throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--container_memory",
+        "128"
+    );
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(ContainerLaunchFailAppMaster.class.getName(),
+        new Configuration(getYarnClusterConfiguration()));
+    Assert.assertTrue(getDSClient().init(args));
+    LOG.info("Running DS Client");
+    Assert.assertFalse(getDSClient().run());
+  }
+
+  @Test
+  public void testDebugFlag() throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--debug"
+    );
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    Assert.assertTrue(getDSClient().init(args));
+    LOG.info("Running DS Client");
+    Assert.assertTrue(getDSClient().run());
+  }
+
+  private int verifyContainerLog(int containerNum,
+      List<String> expectedContent, boolean count, String expectedWord) {
+    File logFolder =
+        new File(getNodeManager(0).getConfig()
+            .get(YarnConfiguration.NM_LOG_DIRS,
+                YarnConfiguration.DEFAULT_NM_LOG_DIRS));
+
+    File[] listOfFiles = logFolder.listFiles();
+    Assert.assertNotNull(listOfFiles);
+    int currentContainerLogFileIndex = -1;
+    for (int i = listOfFiles.length - 1; i >= 0; i--) {
+      if (listOfFiles[i].listFiles().length == containerNum + 1) {
+        currentContainerLogFileIndex = i;
+        break;
+      }
+    }
+    Assert.assertTrue(currentContainerLogFileIndex != -1);
+    File[] containerFiles =
+        listOfFiles[currentContainerLogFileIndex].listFiles();
+
+    int numOfWords = 0;
+    for (File containerFile : containerFiles) {
+      if (containerFile == null) {
+        continue;
+      }
+      for (File output : containerFile.listFiles()) {
+        if (output.getName().trim().contains("stdout")) {
+          List<String> stdOutContent = new ArrayList<>();
+          try (BufferedReader br = new BufferedReader(new FileReader(output))) {
+            String sCurrentLine;
+
+            int numOfline = 0;
+            while ((sCurrentLine = br.readLine()) != null) {
+              if (count) {
+                if (sCurrentLine.contains(expectedWord)) {
+                  numOfWords++;
+                }
+              } else if (output.getName().trim().equals("stdout")) {
+                if (!Shell.WINDOWS) {
+                  Assert.assertEquals("The current is" + sCurrentLine,
+                      expectedContent.get(numOfline), sCurrentLine.trim());
+                  numOfline++;
+                } else {
+                  stdOutContent.add(sCurrentLine.trim());
+                }
+              }
+            }
+            /* By executing bat script using cmd /c,
+             * it will output all contents from bat script first
+             * It is hard for us to do check line by line
+             * Simply check whether output from bat file contains
+             * all the expected messages
+             */
+            if (Shell.WINDOWS && !count
+                && output.getName().trim().equals("stdout")) {
+              Assert.assertTrue(stdOutContent.containsAll(expectedContent));
+            }
+          } catch (IOException e) {
+            LOG.error("Exception reading the buffer", e);
+          }
+        }
+      }
+    }
+    return numOfWords;
+  }
+
+  @Test
+  public void testDistributedShellResourceProfiles() throws Exception {
+    int appNameCounter = 0;
+    String[][] args = {
+        createArgsWithPostFix(appNameCounter++,
+            "--num_containers", "1", "--shell_command",
+            getListCommand(), "--container_resource_profile",
+            "maximum"),
+        createArgsWithPostFix(appNameCounter++,
+            "--num_containers", "1", "--shell_command",
+            getListCommand(), "--master_resource_profile",
+            "default"),
+        createArgsWithPostFix(appNameCounter++,
+            "--num_containers", "1", "--shell_command",
+            getListCommand(), "--master_resource_profile",
+            "default", "--container_resource_profile", "maximum"),
+    };
+
+    for (int i = 0; i < args.length; ++i) {
+      LOG.info("Initializing DS Client[{}]", i);
+      setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+      Assert.assertTrue(getDSClient().init(args[i]));
+      LOG.info("Running DS Client[{}]", i);
+      LambdaTestUtils.intercept(Exception.class,
+          () -> getDSClient().run());
+    }
+  }
+
+  @Test
+  public void testDSShellWithOpportunisticContainers() throws Exception {
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "2",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--shell_command",
+        "date",
+        "--container_type",
+        "OPPORTUNISTIC"
+    );
+    assertTrue(getDSClient().init(args));
+    assertTrue(getDSClient().run());
+  }
+
+  @Test(expected = ResourceNotFoundException.class)
+  public void testDistributedShellAMResourcesWithUnknownResource()
+      throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getListCommand(),
+        "--master_resources",
+        "unknown-resource=5"
+    );
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    assertTrue(getDSClient().init(args));
+    getDSClient().run();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDistributedShellNonExistentQueue()
+      throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getListCommand(),
+        "--queue",
+        "non-existent-queue"
+    );
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    assertTrue(getDSClient().init(args));
+    getDSClient().run();
+  }
+
+  @Test
+  public void testDistributedShellWithSingleFileLocalization()
+      throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getCatCommand(),
+        "--localize_files",
+        "./src/test/resources/a.txt",
+        "--shell_args",
+        "a.txt"
+    );
+
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    assertTrue(getDSClient().init(args));
+    assertTrue("Client exited with an error", getDSClient().run());
+  }
+
+  @Test
+  public void testDistributedShellWithMultiFileLocalization()
+      throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getCatCommand(),
+        "--localize_files",
+        "./src/test/resources/a.txt,./src/test/resources/b.txt",
+        "--shell_args",
+        "a.txt b.txt"
+    );
+
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    assertTrue(getDSClient().init(args));
+    assertTrue("Client exited with an error", getDSClient().run());
+  }
+
+  @Test(expected = UncheckedIOException.class)
+  public void testDistributedShellWithNonExistentFileLocalization()
+      throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getCatCommand(),
+        "--localize_files",
+        "/non/existing/path/file.txt",
+        "--shell_args",
+        "file.txt"
+    );
+
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    assertTrue(getDSClient().init(args));
+    assertTrue(getDSClient().run());
+  }
+
+  @Test
+  public void testDistributedShellCleanup()
+      throws Exception {
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "1",
+        "--shell_command",
+        getListCommand()
+    );
+    Configuration config = new Configuration(getYarnClusterConfiguration());
+    setAndGetDSClient(config);
+
+    assertTrue(getDSClient().init(args));
+    assertTrue(getDSClient().run());
+    ApplicationId appId = getDSClient().getAppId();
+    String relativePath =
+        ApplicationMaster.getRelativePath(generateAppName(),
+            appId.toString(), "");
+    FileSystem fs1 = FileSystem.get(config);
+    Path path = new Path(fs1.getHomeDirectory(), relativePath);
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return !fs1.exists(path);
+      } catch (IOException e) {
+        return false;
+      }
+    }, 10, 60000);
+
+    assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
+  }
+
+  @Override
+  protected void customizeConfiguration(
+      YarnConfiguration config) throws Exception {
+    config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+        CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
+  }
+
+  private static File getBaseDirForTest() {
+    return new File("target", TestDSTimelineV10.class.getName());
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java
new file mode 100644
index 0000000..634bac4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
+
+/**
+ * Unit tests implementations for distributed shell on TimeLineV1.5.
+ */
+public class TestDSTimelineV15 extends DistributedShellBaseTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDSTimelineV15.class);
+
+  @Override
+  protected float getTimelineVersion() {
+    return 1.5f;
+  }
+
+  @Override
+  protected void customizeConfiguration(
+      YarnConfiguration config) throws Exception {
+    setUpHDFSCluster();
+    PluginStoreTestUtils.prepareFileSystemForPluginStore(
+        getHDFSCluster().getFileSystem());
+    PluginStoreTestUtils.prepareConfiguration(config, getHDFSCluster());
+    config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
+        DistributedShellTimelinePlugin.class.getName());
+  }
+
+  @Override
+  protected void checkTimeline(ApplicationId appId,
+      boolean defaultFlow, boolean haveDomain,
+      ApplicationReport appReport) throws Exception {
+    long scanInterval = getConfiguration().getLong(
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+        YarnConfiguration
+            .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
+    );
+    Path doneDir = new Path(
+        YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
+    );
+    // Wait till the data is moved to done dir, or timeout and fail
+    AtomicReference<Exception> exceptionRef = new AtomicReference<>(null);
+    GenericTestUtils.waitFor(() -> {
+      try {
+        RemoteIterator<FileStatus> iterApps =
+            getHDFSCluster().getFileSystem().listStatusIterator(doneDir);
+        return (iterApps.hasNext());
+      } catch (Exception e) {
+        exceptionRef.set(e);
+        LOG.error("Exception listing Done Dir", e);
+        return true;
+      }
+    }, scanInterval * 2, TEST_TIME_WINDOW_EXPIRE);
+    Assert.assertNull("Exception in getting listing status",
+        exceptionRef.get());
+    super.checkTimeline(appId, defaultFlow, haveDomain, appReport);
+  }
+
+  @Test
+  public void testDSShellWithDomain() throws Exception {
+    baseTestDSShell(true);
+  }
+
+  @Test
+  public void testDSShellWithoutDomain() throws Exception {
+    baseTestDSShell(false);
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java
new file mode 100644
index 0000000..caf9d3b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * Unit tests implementations for distributed shell on TimeLineV2.
+ */
+public class TestDSTimelineV20 extends DistributedShellBaseTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestDSTimelineV20.class);
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+
+  @Override
+  protected float getTimelineVersion() {
+    return 2.0f;
+  }
+
+  @Override
+  protected void customizeConfiguration(
+      YarnConfiguration config) throws Exception {
+    // disable v1 timeline server since we no longer have a server here
+    // enable aux-service based timeline aggregators
+    config.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
+    config.set(YarnConfiguration.NM_AUX_SERVICES + "." +
+            TIMELINE_AUX_SERVICE_NAME + ".class",
+        PerNodeTimelineCollectorsAuxService.class.getName());
+    config.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class,
+        org.apache.hadoop.yarn.server.timelineservice.storage.
+            TimelineWriter.class);
+    setTimelineV2StorageDir();
+    // set the file system timeline writer storage directory
+    config.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+        getTimelineV2StorageDir());
+  }
+
+  @Test
+  public void testDSShellWithEnforceExecutionType() throws Exception {
+    YarnClient yarnClient = null;
+    AtomicReference<Throwable> thrownError = new AtomicReference<>(null);
+    AtomicReference<List<ContainerReport>> containersListRef =
+        new AtomicReference<>(null);
+    AtomicReference<ApplicationAttemptId> appAttemptIdRef =
+        new AtomicReference<>(null);
+    AtomicReference<ApplicationAttemptReport> appAttemptReportRef =
+        new AtomicReference<>(null);
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "2",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1",
+        "--shell_command",
+        getListCommand(),
+        "--container_type",
+        "OPPORTUNISTIC",
+        "--enforce_execution_type"
+    );
+    try {
+      setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+      getDSClient().init(args);
+      Thread dsClientRunner = new Thread(() -> {
+        try {
+          getDSClient().run();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      });
+      dsClientRunner.start();
+
+      yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(new Configuration(getYarnClusterConfiguration()));
+      yarnClient.start();
+
+      // expecting three containers including the AM container.
+      waitForContainersLaunch(yarnClient, 3, appAttemptReportRef,
+          containersListRef, appAttemptIdRef, thrownError);
+      if (thrownError.get() != null) {
+        Assert.fail(thrownError.get().getMessage());
+      }
+      ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId();
+      for (ContainerReport container : containersListRef.get()) {
+        if (!container.getContainerId().equals(amContainerId)) {
+          Assert.assertEquals(container.getExecutionType(),
+              ExecutionType.OPPORTUNISTIC);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Job execution with enforce execution type failed.", e);
+      Assert.fail("Exception. " + e.getMessage());
+    } finally {
+      if (yarnClient != null) {
+        yarnClient.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testDistributedShellWithResources() throws Exception {
+    doTestDistributedShellWithResources(false);
+  }
+
+  @Test
+  public void testDistributedShellWithResourcesWithLargeContainers()
+      throws Exception {
+    doTestDistributedShellWithResources(true);
+  }
+
+  private void doTestDistributedShellWithResources(boolean largeContainers)
+      throws Exception {
+    AtomicReference<Throwable> thrownExceptionRef =
+        new AtomicReference<>(null);
+    AtomicReference<List<ContainerReport>> containersListRef =
+        new AtomicReference<>(null);
+    AtomicReference<ApplicationAttemptId> appAttemptIdRef =
+        new AtomicReference<>(null);
+    AtomicReference<ApplicationAttemptReport> appAttemptReportRef =
+        new AtomicReference<>(null);
+    Resource clusterResource = getYarnCluster().getResourceManager()
+        .getResourceScheduler().getClusterResource();
+    String masterMemoryString = "1 Gi";
+    String containerMemoryString = "512 Mi";
+    long[] memVars = {1024, 512};
+    YarnClient yarnClient = null;
+    Assume.assumeTrue("The cluster doesn't have enough memory for this test",
+        clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
+    Assume.assumeTrue("The cluster doesn't have enough cores for this test",
+        clusterResource.getVirtualCores() >= 2);
+    if (largeContainers) {
+      memVars[0] = clusterResource.getMemorySize() * 2 / 3;
+      memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
+      masterMemoryString = memVars[0] + "Mi";
+      memVars[1] = clusterResource.getMemorySize() / 3;
+      memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
+      containerMemoryString = String.valueOf(memVars[1]);
+    }
+
+    String[] args = createArgumentsWithAppName(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        getListCommand(),
+        "--master_resources",
+        "memory=" + masterMemoryString + ",vcores=1",
+        "--container_resources",
+        "memory=" + containerMemoryString + ",vcores=1"
+    );
+
+    LOG.info("Initializing DS Client");
+    setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+    Assert.assertTrue(getDSClient().init(args));
+    LOG.info("Running DS Client");
+    final AtomicBoolean result = new AtomicBoolean(false);
+    Thread dsClientRunner = new Thread(() -> {
+      try {
+        result.set(getDSClient().run());
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    });
+    dsClientRunner.start();
+    try {
+      yarnClient = YarnClient.createYarnClient();
+      yarnClient.init(new Configuration(getYarnClusterConfiguration()));
+      yarnClient.start();
+      // expecting two containers.
+      waitForContainersLaunch(yarnClient, 2, appAttemptReportRef,
+          containersListRef, appAttemptIdRef, thrownExceptionRef);
+      if (thrownExceptionRef.get() != null) {
+        Assert.fail(thrownExceptionRef.get().getMessage());
+      }
+      ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId();
+      ContainerReport report = yarnClient.getContainerReport(amContainerId);
+      Resource masterResource = report.getAllocatedResource();
+      Assert.assertEquals(memVars[0], masterResource.getMemorySize());
+      Assert.assertEquals(1, masterResource.getVirtualCores());
+      for (ContainerReport container : containersListRef.get()) {
+        if (!container.getContainerId().equals(amContainerId)) {
+          Resource containerResource = container.getAllocatedResource();
+          Assert.assertEquals(memVars[1],
+              containerResource.getMemorySize());
+          Assert.assertEquals(1, containerResource.getVirtualCores());
+        }
+      }
+    } finally {
+      LOG.info("Signaling Client to Stop");
+      if (yarnClient != null) {
+        LOG.info("Stopping yarnClient service");
+        yarnClient.stop();
+      }
+    }
+  }
+
+  @Test
+  public void testDSShellWithoutDomain() throws Exception {
+    baseTestDSShell(false);
+  }
+
+  @Test
+  public void testDSShellWithoutDomainDefaultFlow() throws Exception {
+    baseTestDSShell(false, true);
+  }
+
+  @Test
+  public void testDSShellWithoutDomainCustomizedFlow() throws Exception {
+    baseTestDSShell(false, false);
+  }
+
+  @Override
+  protected String[] appendFlowArgsForTestDSShell(String[] args,
+      boolean defaultFlow) {
+    if (!defaultFlow) {
+      String[] flowArgs = {
+          "--flow_name",
+          "test_flow_name",
+          "--flow_version",
+          "test_flow_version",
+          "--flow_run_id",
+          "12345678"
+      };
+      args = mergeArgs(args, flowArgs);
+    }
+    return args;
+  }
+
+  @Override
+  protected void checkTimeline(ApplicationId appId, boolean defaultFlow,
+      boolean haveDomain, ApplicationReport appReport) throws Exception {
+    LOG.info("Started {}#checkTimeline()", getClass().getCanonicalName());
+    // For PoC check using the file-based timeline writer (YARN-3264)
+    String tmpRoot = getTimelineV2StorageDir() + File.separator + "entities"
+        + File.separator;
+
+    File tmpRootFolder = new File(tmpRoot);
+    try {
+      Assert.assertTrue(tmpRootFolder.isDirectory());
+      String basePath = tmpRoot +
+          YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
+          UserGroupInformation.getCurrentUser().getShortUserName() +
+          (defaultFlow ?
+              File.separator + appReport.getName() + File.separator +
+                  TimelineUtils.DEFAULT_FLOW_VERSION + File.separator +
+                  appReport.getStartTime() + File.separator :
+              File.separator + "test_flow_name" + File.separator +
+                  "test_flow_version" + File.separator + "12345678" +
+                  File.separator) +
+          appId.toString();
+      LOG.info("basePath for appId {}: {}", appId, basePath);
+      // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+
+      // Verify DS_APP_ATTEMPT entities posted by the client
+      // there will be at least one attempt, look for that file
+      String appTimestampFileName =
+          String.format("appattempt_%d_000%d_000001%s",
+              appId.getClusterTimestamp(), appId.getId(),
+              FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+      File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
+          "DS_APP_ATTEMPT", appTimestampFileName);
+      // Check if required events are published and same idprefix is sent for
+      // on each publish.
+      verifyEntityForTimeline(dsAppAttemptEntityFile,
+          DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
+      // to avoid race condition of testcase, at least check 40 times with
+      // sleep of 50ms
+      verifyEntityForTimeline(dsAppAttemptEntityFile,
+          DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
+
+      // Verify DS_CONTAINER entities posted by the client.
+      String containerTimestampFileName =
+          String.format("container_%d_000%d_01_000002.thist",
+              appId.getClusterTimestamp(), appId.getId());
+      File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
+          "DS_CONTAINER", containerTimestampFileName);
+      // Check if required events are published and same idprefix is sent for
+      // on each publish.
+      verifyEntityForTimeline(dsContainerEntityFile,
+          DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
+      // to avoid race condition of testcase, at least check 40 times with
+      // sleep of 50ms.
+      verifyEntityForTimeline(dsContainerEntityFile,
+          DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
+
+      // Verify NM posting container metrics info.
+      String containerMetricsTimestampFileName =
+          String.format("container_%d_000%d_01_000001%s",
+              appId.getClusterTimestamp(), appId.getId(),
+              FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+      File containerEntityFile = verifyEntityTypeFileExists(basePath,
+          TimelineEntityType.YARN_CONTAINER.toString(),
+          containerMetricsTimestampFileName);
+      verifyEntityForTimeline(containerEntityFile,
+          ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
+
+      // to avoid race condition of testcase, at least check 40 times with
+      // sleep of 50ms
+      verifyEntityForTimeline(containerEntityFile,
+          ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
+
+      // Verify RM posting Application life cycle Events are getting published
+      String appMetricsTimestampFileName =
+          String.format("application_%d_000%d%s",
+              appId.getClusterTimestamp(), appId.getId(),
+              FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+      File appEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION.toString(),
+              appMetricsTimestampFileName);
+      // No need to check idprefix for app.
+      verifyEntityForTimeline(appEntityFile,
+          ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
+
+      // to avoid race condition of testcase, at least check 40 times with
+      // sleep of 50ms
+      verifyEntityForTimeline(appEntityFile,
+          ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
+
+      // Verify RM posting AppAttempt life cycle Events are getting published
+      String appAttemptMetricsTimestampFileName =
+          String.format("appattempt_%d_000%d_000001%s",
+              appId.getClusterTimestamp(), appId.getId(),
+              FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+
+      File appAttemptEntityFile =
+          verifyEntityTypeFileExists(basePath,
+              TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+              appAttemptMetricsTimestampFileName);
+      verifyEntityForTimeline(appAttemptEntityFile,
+          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
+      verifyEntityForTimeline(appAttemptEntityFile,
+          AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
+    } finally {
+      try {
+        FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+      } catch (Exception ex) {
+        // the recursive delete can throw an exception when one of the file
+        // does not exist.
+        LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
+      }
+    }
+  }
+
+  /**
+   * Checks the events and idprefix published for an entity.
+   *
+   * @param entityFile Entity file.
+   * @param expectedEvent Expected event Id.
+   * @param numOfExpectedEvent Number of expected occurrences of expected event
+   *                           id.
+   * @param checkTimes Number of times to check.
+   * @param sleepTime Sleep time for each iteration.
+   * @param checkIdPrefix Whether to check idprefix.
+   * @throws IOException if entity file reading fails.
+   * @throws InterruptedException if sleep is interrupted.
+   */
+  private void verifyEntityForTimeline(File entityFile, String expectedEvent,
+      long numOfExpectedEvent, int checkTimes, long sleepTime,
+      boolean checkIdPrefix) throws Exception  {
+    AtomicReference<Throwable> thrownExceptionRef = new AtomicReference<>(null);
+    GenericTestUtils.waitFor(() -> {
+      String strLine;
+      long actualCount = 0;
+      long idPrefix = -1;
+      try (BufferedReader reader =
+               new BufferedReader(new FileReader(entityFile))) {
+        while ((strLine = reader.readLine()) != null) {
+          String entityLine = strLine.trim();
+          if (entityLine.isEmpty()) {
+            continue;
+          }
+          if (entityLine.contains(expectedEvent)) {
+            actualCount++;
+          }
+          if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString())
+              && entityLine.contains(expectedEvent)) {
+            TimelineEntity entity = FileSystemTimelineReaderImpl.
+                getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
+            TimelineEvent event = entity.getEvents().pollFirst();
+            Assert.assertNotNull(event);
+            Assert.assertTrue("diagnostics",
+                event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
+          }
+          if (checkIdPrefix) {
+            TimelineEntity entity = FileSystemTimelineReaderImpl.
+                getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
+            Assert.assertTrue("Entity ID prefix expected to be > 0",
+                entity.getIdPrefix() > 0);
+            if (idPrefix == -1) {
+              idPrefix = entity.getIdPrefix();
+            } else {
+              Assert.assertEquals(
+                  "Entity ID prefix should be same across each publish of "
+                      + "same entity", idPrefix, entity.getIdPrefix());
+            }
+          }
+        }
+      } catch (Throwable e) {
+        LOG.error("Exception is waiting on application report", e);
+        thrownExceptionRef.set(e);
+        return true;
+      }
+      return (numOfExpectedEvent == actualCount);
+    }, sleepTime, (checkTimes + 1) * sleepTime);
+
+    if (thrownExceptionRef.get() != null) {
+      Assert.fail("verifyEntityForTimeline failed "
+          + thrownExceptionRef.get().getMessage());
+    }
+  }
+
+  private File verifyEntityTypeFileExists(String basePath, String entityType,
+      String entityFileName) {
+    String outputDirPathForEntity =
+        basePath + File.separator + entityType + File.separator;
+    LOG.info("verifyEntityTypeFileExists output path for entityType {}: {}",
+        entityType, outputDirPathForEntity);
+    File outputDirForEntity = new File(outputDirPathForEntity);
+    Assert.assertTrue(outputDirForEntity.isDirectory());
+    String entityFilePath = outputDirPathForEntity + entityFileName;
+    File entityFile = new File(entityFilePath);
+    Assert.assertTrue(entityFile.exists());
+    return entityFile;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
index 39c774c..19f0423 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
@@ -17,42 +17,50 @@
  */
 
 package org.apache.hadoop.yarn.applications.distributedshell;
+
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
-import java.util.Set;
 import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
-
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
 
 /**
  * Test for Distributed Shell With Multiple Node Managers.
@@ -64,23 +72,28 @@ public class TestDSWithMultipleNodeManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class);
 
-  static final int NUM_NMS = 2;
-  TestDistributedShell distShellTest;
-  private final Boolean multiNodePlacementEnabled;
+  private static final int NUM_NMS = 2;
   private static final String POLICY_CLASS_NAME =
-      "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement."
-      + "ResourceUsageMultiNodeLookupPolicy";
+      ResourceUsageMultiNodeLookupPolicy.class.getName();
+  private final Boolean multiNodePlacementEnabled;
+  @Rule
+  public TestName name = new TestName();
+  @Rule
+  public Timeout globalTimeout =
+      new Timeout(DistributedShellBaseTest.TEST_TIME_OUT,
+          TimeUnit.MILLISECONDS);
+  private DistributedShellBaseTest distShellTest;
+  private Client dsClient;
 
+  public TestDSWithMultipleNodeManager(Boolean multiNodePlacementEnabled) {
+    this.multiNodePlacementEnabled = multiNodePlacementEnabled;
+  }
 
   @Parameterized.Parameters
   public static Collection<Boolean> getParams() {
     return Arrays.asList(false, true);
   }
 
-  public TestDSWithMultipleNodeManager(Boolean multiNodePlacementEnabled) {
-    this.multiNodePlacementEnabled = multiNodePlacementEnabled;
-  }
-
   private YarnConfiguration getConfiguration(
       boolean multiNodePlacementConfigs) {
     YarnConfiguration conf = new YarnConfiguration();
@@ -103,41 +116,59 @@ public class TestDSWithMultipleNodeManager {
     return conf;
   }
 
+  @BeforeClass
+  public static void setupUnitTests() throws Exception {
+    TestDSTimelineV10.setupUnitTests();
+  }
+
+  @AfterClass
+  public static void tearDownUnitTests() throws Exception {
+    TestDSTimelineV10.tearDownUnitTests();
+  }
+
   @Before
   public void setup() throws Exception {
-    distShellTest = new TestDistributedShell();
+    distShellTest = new TestDSTimelineV10();
     distShellTest.setupInternal(NUM_NMS,
         getConfiguration(multiNodePlacementEnabled));
   }
 
   @After
   public void tearDown() throws Exception {
-    distShellTest.tearDown();
+    if (dsClient != null) {
+      dsClient.sendStopSignal();
+      dsClient = null;
+    }
+    if (distShellTest != null) {
+      distShellTest.tearDown();
+      distShellTest = null;
+    }
   }
 
   private void initializeNodeLabels() throws IOException {
-    RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
-
+    RMContext rmContext = distShellTest.getResourceManager(0).getRMContext();
     // Setup node labels
     RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
-    Set<String> labels = new HashSet<String>();
+    Set<String> labels = new HashSet<>();
     labels.add("x");
     labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(labels);
 
     // Setup queue access to node labels
-    distShellTest.conf.set(PREFIX + "root.accessible-node-labels", "x");
-    distShellTest.conf.set(PREFIX + "root.accessible-node-labels.x.capacity",
-        "100");
-    distShellTest.conf.set(PREFIX + "root.default.accessible-node-labels", "x");
-    distShellTest.conf.set(PREFIX
+    distShellTest.setConfiguration(PREFIX + "root.accessible-node-labels", "x");
+    distShellTest.setConfiguration(
+        PREFIX + "root.accessible-node-labels.x.capacity", "100");
+    distShellTest.setConfiguration(
+        PREFIX + "root.default.accessible-node-labels", "x");
+    distShellTest.setConfiguration(PREFIX
         + "root.default.accessible-node-labels.x.capacity", "100");
 
-    rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext);
+    rmContext.getScheduler().reinitialize(distShellTest.getConfiguration(),
+        rmContext);
 
     // Fetch node-ids from yarn cluster
     NodeId[] nodeIds = new NodeId[NUM_NMS];
     for (int i = 0; i < NUM_NMS; i++) {
-      NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i);
+      NodeManager mgr = distShellTest.getNodeManager(i);
       nodeIds[i] = mgr.getNMContext().getNodeId();
     }
 
@@ -145,264 +176,312 @@ public class TestDSWithMultipleNodeManager {
     labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
   }
 
-  @Test(timeout=90000)
+  @Test
   public void testDSShellWithNodeLabelExpression() throws Exception {
+    NMContainerMonitor containerMonitorRunner = null;
     initializeNodeLabels();
 
-    // Start NMContainerMonitor
-    NMContainerMonitor mon = new NMContainerMonitor();
-    Thread t = new Thread(mon);
-    t.start();
-
-    // Submit a job which will sleep for 60 sec
-    String[] args = {
-        "--jar",
-        TestDistributedShell.APPMASTER_JAR,
-        "--num_containers",
-        "4",
-        "--shell_command",
-        "sleep",
-        "--shell_args",
-        "15",
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1",
-        "--node_label_expression",
-        "x"
-    };
-
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-
-    t.interrupt();
-
-    // Check maximum number of containers on each NMs
-    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
-    // Check no container allocated on NM[0]
-    Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
-    // Check there're some containers allocated on NM[1]
-    Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+    try {
+      // Start NMContainerMonitor
+      containerMonitorRunner = new NMContainerMonitor();
+      containerMonitorRunner.start();
+
+      // Submit a job which will sleep for 60 sec
+      String[] args =
+          DistributedShellBaseTest.createArguments(() -> generateAppName(),
+              "--num_containers",
+              "4",
+              "--shell_command",
+              "sleep",
+              "--shell_args",
+              "15",
+              "--master_memory",
+              "512",
+              "--master_vcores",
+              "2",
+              "--container_memory",
+              "128",
+              "--container_vcores",
+              "1",
+              "--node_label_expression",
+              "x"
+          );
+
+      LOG.info("Initializing DS Client");
+      dsClient =
+          new Client(
+              new Configuration(distShellTest.getYarnClusterConfiguration()));
+      Assert.assertTrue(dsClient.init(args));
+      LOG.info("Running DS Client");
+      boolean result = dsClient.run();
+      LOG.info("Client run completed. Result={}", result);
+
+      containerMonitorRunner.stopMonitoring();
+
+      // Check maximum number of containers on each NMs
+      int[] maxRunningContainersOnNMs =
+          containerMonitorRunner.getMaxRunningContainersReport();
+      // Check no container allocated on NM[0]
+      Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
+      // Check there are some containers allocated on NM[1]
+      Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+    } finally {
+      if (containerMonitorRunner != null) {
+        containerMonitorRunner.stopMonitoring();
+        containerMonitorRunner.join();
+      }
+    }
   }
 
-  @Test(timeout = 90000)
+  @Test
   public void testDistributedShellWithPlacementConstraint()
       throws Exception {
-    NMContainerMonitor mon = new NMContainerMonitor();
-    Thread t = new Thread(mon);
-    t.start();
-
-    String[] args = {
-        "--jar",
-        distShellTest.APPMASTER_JAR,
-        "1",
-        "--shell_command",
-        distShellTest.getSleepCommand(15),
-        "--placement_spec",
-        "zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
-    };
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-
-    t.interrupt();
-
-    ConcurrentMap<ApplicationId, RMApp> apps = distShellTest.yarnCluster.
-        getResourceManager().getRMContext().getRMApps();
-    RMApp app = apps.values().iterator().next();
-    RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
-    NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
-    NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0);
-
-    int expectedNM1Count = 1;
-    int expectedNM2Count = 1;
-    if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
-      expectedNM1Count++;
-    } else {
-      expectedNM2Count++;
-    }
+    NMContainerMonitor containerMonitorRunner = null;
+    String[] args =
+        DistributedShellBaseTest.createArguments(() -> generateAppName(),
+            "1",
+            "--shell_command",
+            DistributedShellBaseTest.getSleepCommand(15),
+            "--placement_spec",
+            "zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
+        );
+    try {
+      containerMonitorRunner = new NMContainerMonitor();
+      containerMonitorRunner.start();
+
+      LOG.info("Initializing DS Client with args {}", Arrays.toString(args));
+      dsClient =
+          new Client(
+              new Configuration(distShellTest.getYarnClusterConfiguration()));
+      Assert.assertTrue(dsClient.init(args));
+      LOG.info("Running DS Client");
+      boolean result = dsClient.run();
+      LOG.info("Client run completed. Result={}", result);
+
+      containerMonitorRunner.stopMonitoring();
+
+      ConcurrentMap<ApplicationId, RMApp> apps =
+          distShellTest.getResourceManager().getRMContext().getRMApps();
+      RMApp app = apps.values().iterator().next();
+      RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
+      NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
+      NodeManager nm1 = distShellTest.getNodeManager(0);
+
+      int[] expectedNMsCount = new int[]{1, 1};
+      if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
+        expectedNMsCount[0]++;
+      } else {
+        expectedNMsCount[1]++;
+      }
 
-    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
-    Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
-    Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
+      int[] maxRunningContainersOnNMs =
+          containerMonitorRunner.getMaxRunningContainersReport();
+      Assert.assertEquals(expectedNMsCount[0], maxRunningContainersOnNMs[0]);
+      Assert.assertEquals(expectedNMsCount[1], maxRunningContainersOnNMs[1]);
+    } finally {
+      if (containerMonitorRunner != null) {
+        containerMonitorRunner.stopMonitoring();
+        containerMonitorRunner.join();
+      }
+    }
   }
 
-  @Test(timeout = 90000)
+  @Test
   public void testDistributedShellWithAllocationTagNamespace()
       throws Exception {
-    NMContainerMonitor mon = new NMContainerMonitor();
-    Thread monitorThread = new Thread(mon);
-    monitorThread.start();
-
-    String[] argsA = {
-        "--jar",
-        distShellTest.APPMASTER_JAR,
-        "--shell_command",
-        distShellTest.getSleepCommand(30),
-        "--placement_spec",
-        "bar(1),notin,node,bar"
-    };
-    final Client clientA =
-        new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
-    clientA.init(argsA);
-    final AtomicBoolean resultA = new AtomicBoolean(false);
-    Thread t = new Thread() {
-      public void run() {
+    NMContainerMonitor containerMonitorRunner = null;
+    Client clientB = null;
+    YarnClient yarnClient = null;
+
+    String[] argsA =
+        DistributedShellBaseTest.createArguments(() -> generateAppName("001"),
+            "--shell_command",
+            DistributedShellBaseTest.getSleepCommand(30),
+            "--placement_spec",
+            "bar(1),notin,node,bar"
+        );
+    String[] argsB =
+        DistributedShellBaseTest.createArguments(() -> generateAppName("002"),
+            "1",
+            "--shell_command",
+            DistributedShellBaseTest.getListCommand(),
+            "--placement_spec",
+            "foo(3),notin,node,all/bar"
+        );
+
+    try {
+      containerMonitorRunner = new NMContainerMonitor();
+      containerMonitorRunner.start();
+      dsClient =
+          new Client(
+              new Configuration(distShellTest.getYarnClusterConfiguration()));
+      dsClient.init(argsA);
+      Thread dsClientRunner = new Thread(() -> {
         try {
-          resultA.set(clientA.run());
+          dsClient.run();
         } catch (Exception e) {
           throw new RuntimeException(e);
         }
+      });
+      dsClientRunner.start();
+
+      NodeId taskContainerNodeIdA;
+      ConcurrentMap<ApplicationId, RMApp> apps;
+      AtomicReference<RMApp> appARef = new AtomicReference<>(null);
+      AtomicReference<NodeId> masterContainerNodeIdARef =
+          new AtomicReference<>(null);
+      int[] expectedNMCounts = new int[]{0, 0};
+
+      waitForExpectedNMsCount(expectedNMCounts, appARef,
+          masterContainerNodeIdARef);
+
+      NodeId nodeA = distShellTest.getNodeManager(0).getNMContext().
+          getNodeId();
+      NodeId nodeB = distShellTest.getNodeManager(1).getNMContext().
+          getNodeId();
+      Assert.assertEquals(2, (expectedNMCounts[0] + expectedNMCounts[1]));
+      if (expectedNMCounts[0] != expectedNMCounts[1]) {
+        taskContainerNodeIdA = masterContainerNodeIdARef.get();
+      } else {
+        taskContainerNodeIdA =
+            masterContainerNodeIdARef.get().equals(nodeA) ? nodeB : nodeA;
       }
-    };
-    t.start();
-
-    NodeId masterContainerNodeIdA;
-    NodeId taskContainerNodeIdA;
-    ConcurrentMap<ApplicationId, RMApp> apps;
-    RMApp appA;
-
-    int expectedNM1Count = 0;
-    int expectedNM2Count = 0;
-    while (true) {
-      if ((expectedNM1Count + expectedNM2Count) < 2) {
-        expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0).
-            getNMContext().getContainers().size();
-        expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1).
-            getNMContext().getContainers().size();
-        continue;
+
+      clientB =
+          new Client(
+              new Configuration(distShellTest.getYarnClusterConfiguration()));
+      clientB.init(argsB);
+      Assert.assertTrue(clientB.run());
+      containerMonitorRunner.stopMonitoring();
+      apps = distShellTest.getResourceManager().getRMContext().getRMApps();
+      Iterator<RMApp> it = apps.values().iterator();
+      RMApp appB = it.next();
+      if (appARef.get().equals(appB)) {
+        appB = it.next();
       }
-      apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
-          getRMApps();
-      if (apps.isEmpty()) {
-        Thread.sleep(10);
-        continue;
+      LOG.info("Allocation Tag NameSpace Applications are={} and {}",
+          appARef.get().getApplicationId(), appB.getApplicationId());
+
+      RMAppAttempt appAttemptB =
+          appB.getAppAttempts().values().iterator().next();
+      NodeId masterContainerNodeIdB =
+          appAttemptB.getMasterContainer().getNodeId();
+
+      if (nodeA.equals(masterContainerNodeIdB)) {
+        expectedNMCounts[0]++;
+      } else {
+        expectedNMCounts[1]++;
       }
-      appA = apps.values().iterator().next();
-      if (appA.getAppAttempts().isEmpty()) {
-        Thread.sleep(10);
-        continue;
+      if (nodeA.equals(taskContainerNodeIdA)) {
+        expectedNMCounts[1] += 3;
+      } else {
+        expectedNMCounts[0] += 3;
       }
-      RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator().
-          next();
-      if (appAttemptA.getMasterContainer() == null) {
-        Thread.sleep(10);
-        continue;
+      int[] maxRunningContainersOnNMs =
+          containerMonitorRunner.getMaxRunningContainersReport();
+      Assert.assertEquals(expectedNMCounts[0], maxRunningContainersOnNMs[0]);
+      Assert.assertEquals(expectedNMCounts[1], maxRunningContainersOnNMs[1]);
+
+      try {
+        yarnClient = YarnClient.createYarnClient();
+        yarnClient.init(
+            new Configuration(distShellTest.getYarnClusterConfiguration()));
+        yarnClient.start();
+        yarnClient.killApplication(appARef.get().getApplicationId());
+      } catch (Exception e) {
+        // Ignore Exception while killing a job
+        LOG.warn("Exception killing the job: {}", e.getMessage());
+      }
+    } finally {
+      if (yarnClient != null) {
+        yarnClient.stop();
+      }
+      if (clientB != null) {
+        clientB.sendStopSignal();
+      }
+      if (containerMonitorRunner != null) {
+        containerMonitorRunner.stopMonitoring();
+        containerMonitorRunner.join();
       }
-      masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId();
-      break;
     }
+  }
 
-    NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext().
-        getNodeId();
-    NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext().
-        getNodeId();
-    Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count));
-
-    if (expectedNM1Count != expectedNM2Count) {
-      taskContainerNodeIdA = masterContainerNodeIdA;
-    } else {
-      taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB :
-          nodeA;
-    }
+  protected String generateAppName() {
+    return generateAppName(null);
+  }
 
-    String[] argsB = {
-        "--jar",
-        distShellTest.APPMASTER_JAR,
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--placement_spec",
-        "foo(3),notin,node,all/bar"
-    };
-    final Client clientB = new Client(new Configuration(distShellTest.
-        yarnCluster.getConfig()));
-    clientB.init(argsB);
-    boolean resultB = clientB.run();
-    Assert.assertTrue(resultB);
-
-    monitorThread.interrupt();
-    apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
-        getRMApps();
-    Iterator<RMApp> it = apps.values().iterator();
-    RMApp appB = it.next();
-    if (appA.equals(appB)) {
-      appB = it.next();
-    }
-    LOG.info("Allocation Tag NameSpace Applications are=" + appA.
-        getApplicationId() + " and " + appB.getApplicationId());
-
-    RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator().
-        next();
-    NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer().
-        getNodeId();
-
-    if (nodeA.equals(masterContainerNodeIdB)) {
-      expectedNM1Count += 1;
-    } else {
-      expectedNM2Count += 1;
-    }
-    if (nodeA.equals(taskContainerNodeIdA)) {
-      expectedNM2Count += 3;
-    } else {
-      expectedNM1Count += 3;
-    }
-    int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
-    Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
-    Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
+  protected String generateAppName(String postFix) {
+    return name.getMethodName().replaceFirst("test", "")
+        .concat(postFix == null ? "" : "-" + postFix);
+  }
 
-    try {
-      YarnClient yarnClient = YarnClient.createYarnClient();
-      yarnClient.init(new Configuration(distShellTest.yarnCluster.
-          getConfig()));
-      yarnClient.start();
-      yarnClient.killApplication(appA.getApplicationId());
-    } catch (Exception e) {
-     // Ignore Exception while killing a job
-    }
+  private void waitForExpectedNMsCount(int[] expectedNMCounts,
+      AtomicReference<RMApp> appARef,
+      AtomicReference<NodeId> masterContainerNodeIdARef) throws Exception {
+    GenericTestUtils.waitFor(() -> {
+      if ((expectedNMCounts[0] + expectedNMCounts[1]) < 2) {
+        expectedNMCounts[0] =
+            distShellTest.getNodeManager(0).getNMContext()
+                .getContainers().size();
+        expectedNMCounts[1] =
+            distShellTest.getNodeManager(1).getNMContext()
+                .getContainers().size();
+        return false;
+      }
+      ConcurrentMap<ApplicationId, RMApp> appIDsMap =
+          distShellTest.getResourceManager().getRMContext().getRMApps();
+      if (appIDsMap.isEmpty()) {
+        return false;
+      }
+      appARef.set(appIDsMap.values().iterator().next());
+      if (appARef.get().getAppAttempts().isEmpty()) {
+        return false;
+      }
+      RMAppAttempt appAttemptA =
+          appARef.get().getAppAttempts().values().iterator().next();
+      if (appAttemptA.getMasterContainer() == null) {
+        return false;
+      }
+      masterContainerNodeIdARef.set(
+          appAttemptA.getMasterContainer().getNodeId());
+      return true;
+    }, 10, 60000);
   }
 
   /**
-   * Monitor containers running on NMs
+   * Monitor containers running on NMs.
    */
-  class NMContainerMonitor implements Runnable {
+  class NMContainerMonitor extends Thread {
     // The interval of milliseconds of sampling (500ms)
-    final static int SAMPLING_INTERVAL_MS = 500;
+    private final static int SAMPLING_INTERVAL_MS = 500;
 
     // The maximum number of containers running on each NMs
-    int[] maxRunningContainersOnNMs = new int[NUM_NMS];
+    private final int[] maxRunningContainersOnNMs = new int[NUM_NMS];
+    private final Object quitSignal = new Object();
+    private volatile boolean isRunning = true;
 
     @Override
     public void run() {
-      while (true) {
+      while (isRunning) {
         for (int i = 0; i < NUM_NMS; i++) {
           int nContainers =
-              distShellTest.yarnCluster.getNodeManager(i).getNMContext()
+              distShellTest.getNodeManager(i).getNMContext()
                   .getContainers().size();
           if (nContainers > maxRunningContainersOnNMs[i]) {
             maxRunningContainersOnNMs[i] = nContainers;
           }
         }
-        try {
-          Thread.sleep(SAMPLING_INTERVAL_MS);
-        } catch (InterruptedException e) {
-          e.printStackTrace();
-          break;
+        synchronized (quitSignal) {
+          try {
+            if (!isRunning) {
+              break;
+            }
+            quitSignal.wait(SAMPLING_INTERVAL_MS);
+          } catch (InterruptedException e) {
+            LOG.warn("NMContainerMonitor interrupted");
+            isRunning = false;
+            break;
+          }
         }
       }
     }
@@ -410,5 +489,15 @@ public class TestDSWithMultipleNodeManager {
     public int[] getMaxRunningContainersReport() {
       return maxRunningContainersOnNMs;
     }
+
+    public void stopMonitoring() {
+      if (!isRunning) {
+        return;
+      }
+      synchronized (quitSignal) {
+        isRunning = false;
+        quitSignal.notifyAll();
+      }
+    }
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
deleted file mode 100644
index 87479d6..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ /dev/null
@@ -1,1865 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.applications.distributedshell;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import java.util.function.Supplier;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.net.ServerSocketUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerReport;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LogAggregationContext;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
-import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
-import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.timeline.NameValuePair;
-import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
-import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
-import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
-import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
-import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestDistributedShell {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestDistributedShell.class);
-
-  protected MiniYARNCluster yarnCluster = null;
-  protected MiniDFSCluster hdfsCluster = null;
-  private FileSystem fs = null;
-  private TimelineWriter spyTimelineWriter;
-  protected YarnConfiguration conf = null;
-  // location of the filesystem timeline writer for timeline service v.2
-  private String timelineV2StorageDir = null;
-  private static final int NUM_NMS = 1;
-  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
-  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
-  private static final int MIN_ALLOCATION_MB = 128;
-  private static final int TEST_TIME_OUT = 150000;
-  // set the timeout of the yarnClient to be 95% of the globalTimeout.
-  private static final int TEST_TIME_WINDOW_EXPIRE = (TEST_TIME_OUT * 90) / 100;
-
-  protected final static String APPMASTER_JAR =
-      JarFinder.getJar(ApplicationMaster.class);
-
-  @Rule
-  public TimelineVersionWatcher timelineVersionWatcher
-      = new TimelineVersionWatcher();
-
-  @Rule
-  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
-      TimeUnit.MILLISECONDS);
-
-  @Rule
-  public TemporaryFolder tmpFolder = new TemporaryFolder();
-
-  @Rule
-  public TestName name = new TestName();
-
-  // set the timeout of the yarnClient to be 95% of the globalTimeout.
-  private final String yarnClientTimeout =
-      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
-
-  private final String[] commonArgs = {
-      "--jar",
-      APPMASTER_JAR,
-      "--timeout",
-      yarnClientTimeout,
-      "--appname",
-      ""
-  };
-
-  @Before
-  public void setup() throws Exception {
-    setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion(),
-        new YarnConfiguration());
-  }
-
-  protected void setupInternal(int numNodeManager,
-      YarnConfiguration yarnConfig) throws Exception {
-    setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION, yarnConfig);
-  }
-
-  private void setupInternal(int numNodeManager, float timelineVersion,
-      YarnConfiguration yarnConfig)
-      throws Exception {
-    LOG.info("Starting up YARN cluster");
-
-    this.conf = yarnConfig;
-    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-        MIN_ALLOCATION_MB);
-    // reduce the teardown waiting time
-    conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
-    conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
-    conf.set("yarn.log.dir", "target");
-    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
-    // mark if we need to launch the v1 timeline server
-    // disable aux-service based timeline aggregators
-    conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
-    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
-
-    conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
-    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
-    conf.set("mapreduce.jobhistory.address",
-        "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
-    // Enable ContainersMonitorImpl
-    conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
-        LinuxResourceCalculatorPlugin.class.getName());
-    conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
-        ProcfsBasedProcessTree.class.getName());
-    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
-    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
-    conf.setBoolean(
-        YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
-        true);
-    conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-          true);
-    conf.setBoolean(
-        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
-    conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
-        10);
-    conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
-        YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
-    // ATS version specific settings
-    if (timelineVersion == 1.0f) {
-      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
-      conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
-          CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
-    } else if (timelineVersion == 1.5f) {
-      HdfsConfiguration hdfsConfig = new HdfsConfiguration();
-      hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
-          .numDataNodes(1).build();
-      hdfsCluster.waitActive();
-      fs = hdfsCluster.getFileSystem();
-      PluginStoreTestUtils.prepareFileSystemForPluginStore(fs);
-      PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
-      conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
-          DistributedShellTimelinePlugin.class.getName());
-    } else if (timelineVersion == 2.0f) {
-      // set version to 2
-      conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
-      // disable v1 timeline server since we no longer have a server here
-      // enable aux-service based timeline aggregators
-      conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
-      conf.set(YarnConfiguration.NM_AUX_SERVICES + "." +
-          TIMELINE_AUX_SERVICE_NAME + ".class",
-          PerNodeTimelineCollectorsAuxService.class.getName());
-      conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
-          FileSystemTimelineWriterImpl.class,
-          org.apache.hadoop.yarn.server.timelineservice.storage.
-              TimelineWriter.class);
-      timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
-      // set the file system timeline writer storage directory
-      conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
-          timelineV2StorageDir);
-    } else {
-      Assert.fail("Wrong timeline version number: " + timelineVersion);
-    }
-
-    yarnCluster =
-        new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
-            numNodeManager, 1, 1);
-    yarnCluster.init(conf);
-    yarnCluster.start();
-
-    conf.set(
-        YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-        MiniYARNCluster.getHostname() + ":"
-            + yarnCluster.getApplicationHistoryServer().getPort());
-
-    waitForNMsToRegister();
-
-    URL url = Thread.currentThread().getContextClassLoader().getResource(
-        "yarn-site.xml");
-    if (url == null) {
-      throw new RuntimeException(
-          "Could not find 'yarn-site.xml' dummy file in classpath");
-    }
-    Configuration yarnClusterConfig = yarnCluster.getConfig();
-    yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-        new File(url.getPath()).getParent());
-    //write the document to a buffer (not directly to the file, as that
-    //can cause the file being written to get read -which will then fail.
-    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
-    yarnClusterConfig.writeXml(bytesOut);
-    bytesOut.close();
-    //write the bytes to the file in the classpath
-    OutputStream os = new FileOutputStream(url.getPath());
-    os.write(bytesOut.toByteArray());
-    os.close();
-
-    FileContext fsContext = FileContext.getLocalFSFileContext();
-    fsContext
-        .delete(
-            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
-            true);
-    try {
-      Thread.sleep(2000);
-    } catch (InterruptedException e) {
-      LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
-    }
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    FileContext fsContext = FileContext.getLocalFSFileContext();
-    fsContext
-        .delete(
-            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
-            true);
-    if (yarnCluster != null) {
-      try {
-        yarnCluster.stop();
-      } finally {
-        yarnCluster = null;
-      }
-    }
-    if (hdfsCluster != null) {
-      try {
-        hdfsCluster.shutdown();
-      } finally {
-        hdfsCluster = null;
-      }
-    }
-  }
-
-  @Test
-  public void testDSShellWithDomain() throws Exception {
-    testDSShell(true);
-  }
-
-  @Test
-  public void testDSShellWithoutDomain() throws Exception {
-    testDSShell(false);
-  }
-
-  @Test
-  @TimelineVersion(1.5f)
-  public void testDSShellWithoutDomainV1_5() throws Exception {
-    testDSShell(false);
-  }
-
-  @Test
-  @TimelineVersion(1.5f)
-  public void testDSShellWithDomainV1_5() throws Exception {
-    testDSShell(true);
-  }
-
-  @Test
-  @TimelineVersion(2.0f)
-  public void testDSShellWithoutDomainV2() throws Exception {
-    testDSShell(false);
-  }
-
-  public void testDSShell(boolean haveDomain) throws Exception {
-    testDSShell(haveDomain, true);
-  }
-
-  @Test
-  @TimelineVersion(2.0f)
-  public void testDSShellWithoutDomainV2DefaultFlow() throws Exception {
-    testDSShell(false, true);
-  }
-
-  @Test
-  @TimelineVersion(2.0f)
-  public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception {
-    testDSShell(false, false);
-  }
-
-  public void testDSShell(boolean haveDomain, boolean defaultFlow)
-      throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "2",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1");
-
-    if (haveDomain) {
-      String[] domainArgs = {
-          "--domain",
-          "TEST_DOMAIN",
-          "--view_acls",
-          "reader_user reader_group",
-          "--modify_acls",
-          "writer_user writer_group",
-          "--create"
-      };
-      args = mergeArgs(args, domainArgs);
-    }
-    boolean isTestingTimelineV2 = false;
-    if (timelineVersionWatcher.getTimelineVersion() == 2.0f) {
-      isTestingTimelineV2 = true;
-      if (!defaultFlow) {
-        String[] flowArgs = {
-            "--flow_name",
-            "test_flow_name",
-            "--flow_version",
-            "test_flow_version",
-            "--flow_run_id",
-            "12345678"
-        };
-        args = mergeArgs(args, flowArgs);
-      }
-      LOG.info("Setup: Using timeline v2!");
-    }
-
-    LOG.info("Initializing DS Client");
-    YarnClient yarnClient;
-    final Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    final AtomicBoolean result = new AtomicBoolean(false);
-    Thread t = new Thread() {
-      public void run() {
-        try {
-          result.set(client.run());
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-    t.start();
-
-    yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(new Configuration(yarnCluster.getConfig()));
-    yarnClient.start();
-
-    boolean verified = false;
-    String errorMessage = "";
-    ApplicationId appId = null;
-    ApplicationReport appReport = null;
-    while (!verified) {
-      List<ApplicationReport> apps = yarnClient.getApplications();
-      if (apps.size() == 0) {
-        Thread.sleep(10);
-        continue;
-      }
-      appReport = apps.get(0);
-      appId = appReport.getApplicationId();
-      if (appReport.getHost().equals("N/A")) {
-        Thread.sleep(10);
-        continue;
-      }
-      errorMessage =
-          "'. Expected rpc port to be '-1', was '"
-              + appReport.getRpcPort() + "'.";
-      if (appReport.getRpcPort() == -1) {
-        verified = true;
-      }
-
-      if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
-          && appReport.getFinalApplicationStatus() !=
-          FinalApplicationStatus.UNDEFINED) {
-        break;
-      }
-    }
-    Assert.assertTrue(errorMessage, verified);
-    t.join();
-    LOG.info("Client run completed for testDSShell. Result=" + result);
-    Assert.assertTrue(result.get());
-
-    if (timelineVersionWatcher.getTimelineVersion() == 1.5f) {
-      long scanInterval = conf.getLong(
-          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
-          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
-      );
-      Path doneDir = new Path(
-          YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
-      );
-      // Wait till the data is moved to done dir, or timeout and fail
-      while (true) {
-        RemoteIterator<FileStatus> iterApps = fs.listStatusIterator(doneDir);
-        if (iterApps.hasNext()) {
-          break;
-        }
-        Thread.sleep(scanInterval * 2);
-      }
-    }
-
-    if (!isTestingTimelineV2) {
-      checkTimelineV1(haveDomain);
-    } else {
-      checkTimelineV2(appId, defaultFlow, appReport);
-    }
-  }
-
-  private void checkTimelineV1(boolean haveDomain) throws Exception {
-    TimelineDomain domain = null;
-    if (haveDomain) {
-      domain = yarnCluster.getApplicationHistoryServer()
-          .getTimelineStore().getDomain("TEST_DOMAIN");
-      Assert.assertNotNull(domain);
-      Assert.assertEquals("reader_user reader_group", domain.getReaders());
-      Assert.assertEquals("writer_user writer_group", domain.getWriters());
-    }
-    TimelineEntities entitiesAttempts = yarnCluster
-        .getApplicationHistoryServer()
-        .getTimelineStore()
-        .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
-            null, null, null, null, null, null, null, null, null);
-    Assert.assertNotNull(entitiesAttempts);
-    Assert.assertEquals(1, entitiesAttempts.getEntities().size());
-    Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
-        .size());
-    Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
-        ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
-    if (haveDomain) {
-      Assert.assertEquals(domain.getId(),
-          entitiesAttempts.getEntities().get(0).getDomainId());
-    } else {
-      Assert.assertEquals("DEFAULT",
-          entitiesAttempts.getEntities().get(0).getDomainId());
-    }
-    String currAttemptEntityId
-        = entitiesAttempts.getEntities().get(0).getEntityId();
-    ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(
-        currAttemptEntityId);
-    NameValuePair primaryFilter = new NameValuePair(
-        ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
-        attemptId.getApplicationId().toString());
-    TimelineEntities entities = yarnCluster
-        .getApplicationHistoryServer()
-        .getTimelineStore()
-        .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
-            null, null, null, null, primaryFilter, null, null, null);
-    Assert.assertNotNull(entities);
-    Assert.assertEquals(2, entities.getEntities().size());
-    Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
-        ApplicationMaster.DSEntity.DS_CONTAINER.toString());
-
-    String entityId = entities.getEntities().get(0).getEntityId();
-    org.apache.hadoop.yarn.api.records.timeline.TimelineEntity entity =
-        yarnCluster.getApplicationHistoryServer().getTimelineStore()
-            .getEntity(entityId,
-                ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null);
-    Assert.assertNotNull(entity);
-    Assert.assertEquals(entityId, entity.getEntityId());
-
-    if (haveDomain) {
-      Assert.assertEquals(domain.getId(),
-          entities.getEntities().get(0).getDomainId());
-    } else {
-      Assert.assertEquals("DEFAULT",
-          entities.getEntities().get(0).getDomainId());
-    }
-  }
-
-  private void checkTimelineV2(ApplicationId appId,
-      boolean defaultFlow, ApplicationReport appReport) throws Exception {
-    LOG.info("Started checkTimelineV2 ");
-    // For PoC check using the file-based timeline writer (YARN-3264)
-    String tmpRoot = timelineV2StorageDir + File.separator + "entities" +
-        File.separator;
-
-    File tmpRootFolder = new File(tmpRoot);
-    try {
-      Assert.assertTrue(tmpRootFolder.isDirectory());
-      String basePath = tmpRoot +
-          YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
-          UserGroupInformation.getCurrentUser().getShortUserName() +
-          (defaultFlow ?
-              File.separator + appReport.getName() + File.separator +
-                  TimelineUtils.DEFAULT_FLOW_VERSION + File.separator +
-                  appReport.getStartTime() + File.separator :
-              File.separator + "test_flow_name" + File.separator +
-                  "test_flow_version" + File.separator + "12345678" +
-                  File.separator) +
-          appId.toString();
-      LOG.info("basePath: " + basePath);
-      // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
-
-      // Verify DS_APP_ATTEMPT entities posted by the client
-      // there will be at least one attempt, look for that file
-      String appTimestampFileName =
-          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
-              + "_000001"
-              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
-          "DS_APP_ATTEMPT", appTimestampFileName);
-      // Check if required events are published and same idprefix is sent for
-      // on each publish.
-      verifyEntityForTimelineV2(dsAppAttemptEntityFile,
-          DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
-      // to avoid race condition of testcase, atleast check 40 times with sleep
-      // of 50ms
-      verifyEntityForTimelineV2(dsAppAttemptEntityFile,
-          DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
-
-      // Verify DS_CONTAINER entities posted by the client.
-      String containerTimestampFileName =
-          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
-              + "_01_000002.thist";
-      File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
-          "DS_CONTAINER", containerTimestampFileName);
-      // Check if required events are published and same idprefix is sent for
-      // on each publish.
-      verifyEntityForTimelineV2(dsContainerEntityFile,
-          DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
-      // to avoid race condition of testcase, atleast check 40 times with sleep
-      // of 50ms
-      verifyEntityForTimelineV2(dsContainerEntityFile,
-          DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
-
-      // Verify NM posting container metrics info.
-      String containerMetricsTimestampFileName =
-          "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
-              + "_01_000001"
-              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      File containerEntityFile = verifyEntityTypeFileExists(basePath,
-          TimelineEntityType.YARN_CONTAINER.toString(),
-          containerMetricsTimestampFileName);
-      verifyEntityForTimelineV2(containerEntityFile,
-          ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
-
-      // to avoid race condition of testcase, atleast check 40 times with sleep
-      // of 50ms
-      verifyEntityForTimelineV2(containerEntityFile,
-          ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
-
-      // Verify RM posting Application life cycle Events are getting published
-      String appMetricsTimestampFileName =
-          "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
-              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      File appEntityFile =
-          verifyEntityTypeFileExists(basePath,
-              TimelineEntityType.YARN_APPLICATION.toString(),
-              appMetricsTimestampFileName);
-      // No need to check idprefix for app.
-      verifyEntityForTimelineV2(appEntityFile,
-          ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
-
-      // to avoid race condition of testcase, atleast check 40 times with sleep
-      // of 50ms
-      verifyEntityForTimelineV2(appEntityFile,
-          ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
-
-      // Verify RM posting AppAttempt life cycle Events are getting published
-      String appAttemptMetricsTimestampFileName =
-          "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
-              + "_000001"
-              + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      File appAttemptEntityFile =
-          verifyEntityTypeFileExists(basePath,
-              TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
-              appAttemptMetricsTimestampFileName);
-      verifyEntityForTimelineV2(appAttemptEntityFile,
-          AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
-      verifyEntityForTimelineV2(appAttemptEntityFile,
-          AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
-    } finally {
-      try {
-        FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
-      } catch (FileNotFoundException ex) {
-        // the recursive delete can throw an exception when one of the file
-        // does not exist.
-        LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
-      }
-    }
-  }
-
-  private File verifyEntityTypeFileExists(String basePath, String entityType,
-      String entityfileName) {
-    String outputDirPathForEntity =
-        basePath + File.separator + entityType + File.separator;
-    LOG.info(outputDirPathForEntity);
-    File outputDirForEntity = new File(outputDirPathForEntity);
-    Assert.assertTrue(outputDirForEntity.isDirectory());
-
-    String entityFilePath = outputDirPathForEntity + entityfileName;
-
-    File entityFile = new File(entityFilePath);
-    Assert.assertTrue(entityFile.exists());
-    return entityFile;
-  }
-
-  /**
-   * Checks the events and idprefix published for an entity.
-   *
-   * @param entityFile Entity file.
-   * @param expectedEvent Expected event Id.
-   * @param numOfExpectedEvent Number of expected occurences of expected event
-   *     id.
-   * @param checkTimes Number of times to check.
-   * @param sleepTime Sleep time for each iteration.
-   * @param checkIdPrefix Whether to check idprefix.
-   * @throws IOException if entity file reading fails.
-   * @throws InterruptedException if sleep is interrupted.
-   */
-  private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
-      long numOfExpectedEvent, int checkTimes, long sleepTime,
-      boolean checkIdPrefix) throws IOException, InterruptedException {
-    long actualCount = 0;
-    for (int i = 0; i < checkTimes; i++) {
-      BufferedReader reader = null;
-      String strLine;
-      actualCount = 0;
-      try {
-        reader = new BufferedReader(new FileReader(entityFile));
-        long idPrefix = -1;
-        while ((strLine = reader.readLine()) != null) {
-          String entityLine = strLine.trim();
-          if (entityLine.isEmpty()) {
-            continue;
-          }
-          if (entityLine.contains(expectedEvent)) {
-            actualCount++;
-          }
-          if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString()) &&
-              entityLine.contains(expectedEvent)) {
-            TimelineEntity entity = FileSystemTimelineReaderImpl.
-                getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
-            TimelineEvent event = entity.getEvents().pollFirst();
-            Assert.assertNotNull(event);
-            Assert.assertTrue("diagnostics",
-                event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
-          }
-          if (checkIdPrefix) {
-            TimelineEntity entity = FileSystemTimelineReaderImpl.
-                getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
-            Assert.assertTrue("Entity ID prefix expected to be > 0",
-                entity.getIdPrefix() > 0);
-            if (idPrefix == -1) {
-              idPrefix = entity.getIdPrefix();
-            } else {
-              Assert.assertEquals("Entity ID prefix should be same across " +
-                  "each publish of same entity",
-                      idPrefix, entity.getIdPrefix());
-            }
-          }
-        }
-      } finally {
-        if (reader != null) {
-          reader.close();
-        }
-      }
-      if (numOfExpectedEvent == actualCount) {
-        break;
-      }
-      if (sleepTime > 0 && i < checkTimes - 1) {
-        Thread.sleep(sleepTime);
-      }
-    }
-    Assert.assertEquals("Unexpected number of " +  expectedEvent +
-        " event published.", numOfExpectedEvent, actualCount);
-  }
-
-  /**
-   * Utility function to merge two String arrays to form a new String array for
-   * our argumemts.
-   *
-   * @param args the first set of the arguments.
-   * @param newArgs the second set of the arguments.
-   * @return a String array consists of {args, newArgs}
-   */
-  private String[] mergeArgs(String[] args, String[] newArgs) {
-    int length = args.length + newArgs.length;
-    String[] result = new String[length];
-    System.arraycopy(args, 0, result, 0, args.length);
-    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
-    return result;
-  }
-
-  private String generateAppName(String postFix) {
-    return name.getMethodName().replaceFirst("test", "")
-        .concat(postFix == null? "" : "-" + postFix);
-  }
-
-  private String[] createArguments(String... args) {
-    String[] res = mergeArgs(commonArgs, args);
-    // set the application name so we can track down which command is running.
-    res[commonArgs.length - 1] = generateAppName(null);
-    return res;
-  }
-
-  private String[] createArgsWithPostFix(int index, String... args) {
-    String[] res = mergeArgs(commonArgs, args);
-    // set the application name so we can track down which command is running.
-    res[commonArgs.length - 1] = generateAppName(String.valueOf(index));
-    return res;
-  }
-
-  protected String getSleepCommand(int sec) {
-    // Windows doesn't have a sleep command, ping -n does the trick
-    return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul"
-        : "sleep " + sec;
-  }
-
-  @Test
-  public void testDSRestartWithPreviousRunningContainers() throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        getSleepCommand(8),
-        "--master_memory",
-        "512",
-        "--container_memory",
-        "128",
-        "--keep_containers_across_application_attempts"
-    );
-
-    LOG.info("Initializing DS Client");
-    Client client = new Client(TestDSFailedAppMaster.class.getName(),
-        new Configuration(yarnCluster.getConfig()));
-
-    client.init(args);
-
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-    // application should succeed
-    Assert.assertTrue(result);
-  }
-
-  /*
-   * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
-   * Set attempt_failures_validity_interval as 2.5 seconds. It will check
-   * how many attempt failures for previous 2.5 seconds.
-   * The application is expected to be successful.
-   */
-  @Test
-  public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        getSleepCommand(8),
-        "--master_memory",
-        "512",
-        "--container_memory",
-        "128",
-        "--attempt_failures_validity_interval",
-        "2500"
-    );
-
-    LOG.info("Initializing DS Client");
-    Configuration config = yarnCluster.getConfig();
-    config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-    Client client = new Client(TestDSSleepingAppMaster.class.getName(),
-        new Configuration(config));
-
-    client.init(args);
-
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-
-    LOG.info("Client run completed. Result=" + result);
-    // application should succeed
-    Assert.assertTrue(result);
-  }
-
-  /*
-   * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
-   * Set attempt_failures_validity_interval as 15 seconds. It will check
-   * how many attempt failure for previous 15 seconds.
-   * The application is expected to be fail.
-   */
-  @Test
-  public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        getSleepCommand(8),
-        "--master_memory",
-        "512",
-        "--container_memory",
-        "128",
-        "--attempt_failures_validity_interval",
-        "15000"
-    );
-
-    LOG.info("Initializing DS Client");
-    Configuration config = yarnCluster.getConfig();
-    config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
-    Client client = new Client(TestDSSleepingAppMaster.class.getName(),
-        new Configuration(config));
-
-    client.init(args);
-
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-
-    LOG.info("Client run completed. Result=" + result);
-    // application should be failed
-    Assert.assertFalse(result);
-  }
-
-  @Test
-  public void testDSShellWithCustomLogPropertyFile() throws Exception {
-    final File basedir =
-        new File("target", TestDistributedShell.class.getName());
-    final File tmpDir = new File(basedir, "tmpDir");
-    tmpDir.mkdirs();
-    final File customLogProperty = new File(tmpDir, "custom_log4j.properties");
-    if (customLogProperty.exists()) {
-      customLogProperty.delete();
-    }
-    if(!customLogProperty.createNewFile()) {
-      Assert.fail("Can not create custom log4j property file.");
-    }
-    PrintWriter fileWriter = new PrintWriter(customLogProperty);
-    // set the output to DEBUG level
-    fileWriter.write("log4j.rootLogger=debug,stdout");
-    fileWriter.close();
-    String[] args = createArguments(
-        "--num_containers",
-        "3",
-        "--shell_command",
-        "echo",
-        "--shell_args",
-        "HADOOP",
-        "--log_properties",
-        customLogProperty.getAbsolutePath(),
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1"
-    );
-
-    //Before run the DS, the default the log level is INFO
-    final Logger LOG_Client =
-        LoggerFactory.getLogger(Client.class);
-    Assert.assertTrue(LOG_Client.isInfoEnabled());
-    Assert.assertFalse(LOG_Client.isDebugEnabled());
-    final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class);
-    Assert.assertTrue(LOG_AM.isInfoEnabled());
-    Assert.assertFalse(LOG_AM.isDebugEnabled());
-
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-    Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10);
-    //After DS is finished, the log level should be DEBUG
-    Assert.assertTrue(LOG_Client.isInfoEnabled());
-    Assert.assertTrue(LOG_Client.isDebugEnabled());
-    Assert.assertTrue(LOG_AM.isInfoEnabled());
-    Assert.assertTrue(LOG_AM.isDebugEnabled());
-  }
-
-  @Test
-  public void testSpecifyingLogAggregationContext() throws Exception {
-    String regex = ".*(foo|bar)\\d";
-    String[] args = createArguments(
-        "--shell_command",
-        "echo",
-        "--rolling_log_pattern",
-        regex
-    );
-    final Client client =
-        new Client(new Configuration(yarnCluster.getConfig()));
-    Assert.assertTrue(client.init(args));
-
-    ApplicationSubmissionContext context =
-        Records.newRecord(ApplicationSubmissionContext.class);
-    client.specifyLogAggregationContext(context);
-    LogAggregationContext logContext = context.getLogAggregationContext();
-    assertEquals(logContext.getRolledLogsIncludePattern(), regex);
-    assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
-  }
-
-  public void testDSShellWithCommands() throws Exception {
-
-    String[] args = createArguments(
-        "--num_containers",
-        "2",
-        "--shell_command",
-        "\"echo output_ignored;echo output_expected\"",
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1"
-    );
-
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    try {
-      boolean result = client.run();
-      LOG.info("Client run completed. Result=" + result);
-      List<String> expectedContent = new ArrayList<>();
-      expectedContent.add("output_expected");
-      verifyContainerLog(2, expectedContent, false, "");
-    } finally {
-      client.sendStopSignal();
-    }
-  }
-
-  @Test
-  public void testDSShellWithMultipleArgs() throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "4",
-        "--shell_command",
-        "echo",
-        "--shell_args",
-        "HADOOP YARN MAPREDUCE HDFS",
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1"
-    );
-
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-    List<String> expectedContent = new ArrayList<>();
-    expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
-    verifyContainerLog(4, expectedContent, false, "");
-  }
-
-  @Test
-  public void testDSShellWithShellScript() throws Exception {
-    final File basedir =
-        new File("target", TestDistributedShell.class.getName());
-    final File tmpDir = new File(basedir, "tmpDir");
-    tmpDir.mkdirs();
-    final File customShellScript = new File(tmpDir, "custom_script.sh");
-    if (customShellScript.exists()) {
-      customShellScript.delete();
-    }
-    if (!customShellScript.createNewFile()) {
-      Assert.fail("Can not create custom shell script file.");
-    }
-    PrintWriter fileWriter = new PrintWriter(customShellScript);
-    // set the output to DEBUG level
-    fileWriter.write("echo testDSShellWithShellScript");
-    fileWriter.close();
-    LOG.info(customShellScript.getAbsolutePath());
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_script",
-        customShellScript.getAbsolutePath(),
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1"
-    );
-
-    LOG.info("Initializing DS Client");
-    final Client client =
-        new Client(new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    boolean result = client.run();
-    LOG.info("Client run completed. Result=" + result);
-    List<String> expectedContent = new ArrayList<>();
-    expectedContent.add("testDSShellWithShellScript");
-    verifyContainerLog(1, expectedContent, false, "");
-  }
-
-  @Test
-  public void testDSShellWithInvalidArgs() throws Exception {
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    int appNameCounter = 0;
-    LOG.info("Initializing DS Client with no args");
-    try {
-      client.init(new String[]{});
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("No args"));
-    }
-
-    LOG.info("Initializing DS Client with no jar file");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "2",
-          "--shell_command",
-          Shell.WINDOWS ? "dir" : "ls",
-          "--master_memory",
-          "512",
-          "--container_memory",
-          "128"
-      );
-      String[] argsNoJar = Arrays.copyOfRange(args, 2, args.length);
-      client.init(argsNoJar);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("No jar"));
-    }
-
-    LOG.info("Initializing DS Client with no shell command");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "2",
-          "--master_memory",
-          "512",
-          "--container_memory",
-          "128"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("No shell command"));
-    }
-
-    LOG.info("Initializing DS Client with invalid no. of containers");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "-1",
-          "--shell_command",
-          Shell.WINDOWS ? "dir" : "ls",
-          "--master_memory",
-          "512",
-          "--container_memory",
-          "128"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("Invalid no. of containers"));
-    }
-    
-    LOG.info("Initializing DS Client with invalid no. of vcores");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "2",
-          "--shell_command",
-          Shell.WINDOWS ? "dir" : "ls",
-          "--master_memory",
-          "512",
-          "--master_vcores",
-          "-2",
-          "--container_memory",
-          "128",
-          "--container_vcores",
-          "1"
-      );
-      client.init(args);
-      client.run();
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("Invalid virtual cores specified"));
-    }
-
-    LOG.info("Initializing DS Client with --shell_command and --shell_script");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "2",
-          "--shell_command",
-          Shell.WINDOWS ? "dir" : "ls",
-          "--master_memory",
-          "512",
-          "--master_vcores",
-          "2",
-          "--container_memory",
-          "128",
-          "--container_vcores",
-          "1",
-          "--shell_script",
-          "test.sh"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("Can not specify shell_command option " +
-          "and shell_script option at the same time"));
-    }
-
-    LOG.info("Initializing DS Client without --shell_command and --shell_script");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "2",
-          "--master_memory",
-          "512",
-          "--master_vcores",
-          "2",
-          "--container_memory",
-          "128",
-          "--container_vcores",
-          "1"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("No shell command or shell script specified " +
-          "to be executed by application master"));
-    }
-
-    LOG.info("Initializing DS Client with invalid container_type argument");
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "2",
-          "--master_memory",
-          "512",
-          "--master_vcores",
-          "2",
-          "--container_memory",
-          "128",
-          "--container_vcores",
-          "1",
-          "--shell_command",
-          "date",
-          "--container_type",
-          "UNSUPPORTED_TYPE"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      Assert.assertTrue("The throw exception is not expected",
-          e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE"));
-    }
-
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "1",
-          "--shell_command",
-          Shell.WINDOWS ? "dir" : "ls",
-          "--master_resources",
-          "memory-mb=invalid"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (IllegalArgumentException e) {
-      // do nothing
-      LOG.info("IllegalArgumentException exception is expected: {}",
-          e.getMessage());
-    }
-
-    try {
-      String[] args = createArgsWithPostFix(appNameCounter++,
-          "--num_containers",
-          "1",
-          "--shell_command",
-          Shell.WINDOWS ? "dir" : "ls",
-          "--master_resources"
-      );
-      client.init(args);
-      Assert.fail("Exception is expected");
-    } catch (MissingArgumentException e) {
-      // do nothing
-      LOG.info("MissingArgumentException exception is expected: {}",
-          e.getMessage());
-    }
-  }
-
-  @Test
-  public void testDSTimelineClientWithConnectionRefuse() throws Exception {
-    ApplicationMaster am = new ApplicationMaster();
-
-    TimelineClientImpl client = new TimelineClientImpl() {
-      @Override
-      protected TimelineWriter createTimelineWriter(Configuration conf,
-          UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
-          URI resURI) throws IOException {
-        TimelineWriter timelineWriter =
-            new DirectTimelineWriter(authUgi, client, resURI);
-        spyTimelineWriter = spy(timelineWriter);
-        return spyTimelineWriter;
-      }
-    };
-    client.init(conf);
-    client.start();
-    TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null,
-        false, true);
-    try {
-      UserGroupInformation ugi = mock(UserGroupInformation.class);
-      when(ugi.getShortUserName()).thenReturn("user1");
-      // verify no ClientHandlerException get thrown out.
-      am.publishContainerEndEvent(client, ContainerStatus.newInstance(
-          BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
-          1), "domainId", ugi);
-    } finally {
-      client.stop();
-    }
-  }
-
-  protected void waitForNMsToRegister() throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
-        return (rmContext.getRMNodes().size() >= NUM_NMS);
-      }
-    }, 100, 60000);
-  }
-
-  @Test
-  public void testContainerLaunchFailureHandling() throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "2",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_memory",
-        "512",
-        "--container_memory",
-        "128"
-    );
-
-    LOG.info("Initializing DS Client");
-    Client client = new Client(ContainerLaunchFailAppMaster.class.getName(),
-        new Configuration(yarnCluster.getConfig()));
-    boolean initSuccess = client.init(args);
-    Assert.assertTrue(initSuccess);
-    LOG.info("Running DS Client");
-    try {
-      boolean result = client.run();
-      Assert.assertFalse(result);
-    } finally {
-      client.sendStopSignal();
-    }
-  }
-
-  @Test
-  public void testDebugFlag() throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "2",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_memory",
-        "512",
-        "--master_vcores",
-        "2",
-        "--container_memory",
-        "128",
-        "--container_vcores",
-        "1",
-        "--debug"
-    );
-
-    LOG.info("Initializing DS Client");
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    Assert.assertTrue(client.init(args));
-    LOG.info("Running DS Client");
-    Assert.assertTrue(client.run());
-  }
-
-  private int verifyContainerLog(int containerNum,
-      List<String> expectedContent, boolean count, String expectedWord) {
-    File logFolder =
-        new File(yarnCluster.getNodeManager(0).getConfig()
-            .get(YarnConfiguration.NM_LOG_DIRS,
-                YarnConfiguration.DEFAULT_NM_LOG_DIRS));
-
-    File[] listOfFiles = logFolder.listFiles();
-    int currentContainerLogFileIndex = -1;
-    for (int i = listOfFiles.length - 1; i >= 0; i--) {
-      if (listOfFiles[i].listFiles().length == containerNum + 1) {
-        currentContainerLogFileIndex = i;
-        break;
-      }
-    }
-    Assert.assertTrue(currentContainerLogFileIndex != -1);
-    File[] containerFiles =
-        listOfFiles[currentContainerLogFileIndex].listFiles();
-
-    int numOfWords = 0;
-    for (int i = 0; i < containerFiles.length; i++) {
-      for (File output : containerFiles[i].listFiles()) {
-        if (output.getName().trim().contains("stdout")) {
-          BufferedReader br = null;
-          List<String> stdOutContent = new ArrayList<>();
-          try {
-
-            String sCurrentLine;
-            br = new BufferedReader(new FileReader(output));
-            int numOfline = 0;
-            while ((sCurrentLine = br.readLine()) != null) {
-              if (count) {
-                if (sCurrentLine.contains(expectedWord)) {
-                  numOfWords++;
-                }
-              } else if (output.getName().trim().equals("stdout")){
-                if (! Shell.WINDOWS) {
-                  Assert.assertEquals("The current is" + sCurrentLine,
-                      expectedContent.get(numOfline), sCurrentLine.trim());
-                  numOfline++;
-                } else {
-                  stdOutContent.add(sCurrentLine.trim());
-                }
-              }
-            }
-            /* By executing bat script using cmd /c,
-             * it will output all contents from bat script first
-             * It is hard for us to do check line by line
-             * Simply check whether output from bat file contains
-             * all the expected messages
-             */
-            if (Shell.WINDOWS && !count
-                && output.getName().trim().equals("stdout")) {
-              Assert.assertTrue(stdOutContent.containsAll(expectedContent));
-            }
-          } catch (IOException e) {
-            LOG.error("Exception reading the buffer", e);
-          } finally {
-            try {
-              if (br != null)
-                br.close();
-            } catch (IOException ex) {
-              LOG.error("Exception closing the bufferReader", ex);
-            }
-          }
-        }
-      }
-    }
-    return numOfWords;
-  }
-
-  @Test
-  public void testDistributedShellResourceProfiles() throws Exception {
-    int appNameCounter = 0;
-    String[][] args = {
-        createArgsWithPostFix(appNameCounter++,
-            "--num_containers", "1", "--shell_command",
-            Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
-            "maximum"),
-        createArgsWithPostFix(appNameCounter++,
-            "--num_containers", "1", "--shell_command",
-            Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
-            "default"),
-        createArgsWithPostFix(appNameCounter++,
-            "--num_containers", "1", "--shell_command",
-            Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
-            "default", "--container_resource_profile", "maximum"),
-    };
-
-    for (int i = 0; i < args.length; ++i) {
-      LOG.info("Initializing DS Client");
-      Client client = new Client(new Configuration(yarnCluster.getConfig()));
-      Assert.assertTrue(client.init(args[i]));
-      LOG.info("Running DS Client");
-      try {
-        client.run();
-        Assert.fail("Client run should throw error");
-      } catch (Exception e) {
-        continue;
-      }
-    }
-  }
-
-  @Test
-  public void testDSShellWithOpportunisticContainers() throws Exception {
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    try {
-      String[] args = createArguments(
-          "--num_containers",
-          "2",
-          "--master_memory",
-          "512",
-          "--master_vcores",
-          "2",
-          "--container_memory",
-          "128",
-          "--container_vcores",
-          "1",
-          "--shell_command",
-          "date",
-          "--container_type",
-          "OPPORTUNISTIC"
-      );
-      client.init(args);
-      assertTrue(client.run());
-    } catch (Exception e) {
-      LOG.error("Job execution with opportunistic containers failed.", e);
-      Assert.fail("Exception. " + e.getMessage());
-    } finally {
-      client.sendStopSignal();
-    }
-  }
-
-  @Test
-  @TimelineVersion(2.0f)
-  public void testDSShellWithEnforceExecutionType() throws Exception {
-    YarnClient yarnClient = null;
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    try {
-      String[] args = createArguments(
-          "--num_containers",
-          "2",
-          "--master_memory",
-          "512",
-          "--master_vcores",
-          "2",
-          "--container_memory",
-          "128",
-          "--container_vcores",
-          "1",
-          "--shell_command",
-          "date",
-          "--container_type",
-          "OPPORTUNISTIC",
-          "--enforce_execution_type"
-      );
-      client.init(args);
-      final AtomicBoolean result = new AtomicBoolean(false);
-      Thread t = new Thread() {
-        public void run() {
-          try {
-            result.set(client.run());
-          } catch (Exception e) {
-            throw new RuntimeException(e);
-          }
-        }
-      };
-      t.start();
-
-      yarnClient = YarnClient.createYarnClient();
-      yarnClient.init(new Configuration(yarnCluster.getConfig()));
-      yarnClient.start();
-      waitForContainersLaunch(yarnClient, 2);
-      List<ApplicationReport> apps = yarnClient.getApplications();
-      ApplicationReport appReport = apps.get(0);
-      ApplicationId appId = appReport.getApplicationId();
-      List<ApplicationAttemptReport> appAttempts =
-          yarnClient.getApplicationAttempts(appId);
-      ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
-      ApplicationAttemptId appAttemptId =
-          appAttemptReport.getApplicationAttemptId();
-      List<ContainerReport> containers =
-          yarnClient.getContainers(appAttemptId);
-      // we should get two containers.
-      Assert.assertEquals(2, containers.size());
-      ContainerId amContainerId = appAttemptReport.getAMContainerId();
-      for (ContainerReport container : containers) {
-        if (!container.getContainerId().equals(amContainerId)) {
-          Assert.assertEquals(container.getExecutionType(),
-              ExecutionType.OPPORTUNISTIC);
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Job execution with enforce execution type failed.", e);
-      Assert.fail("Exception. " + e.getMessage());
-    } finally {
-      client.sendStopSignal();
-      if (yarnClient != null) {
-        yarnClient.stop();
-      }
-    }
-  }
-
-  private void waitForContainersLaunch(YarnClient client,
-      int nContainers) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      public Boolean get() {
-        try {
-          List<ApplicationReport> apps = client.getApplications();
-          if (apps == null || apps.isEmpty()) {
-            return false;
-          }
-          ApplicationId appId = apps.get(0).getApplicationId();
-          List<ApplicationAttemptReport> appAttempts =
-              client.getApplicationAttempts(appId);
-          if (appAttempts == null || appAttempts.isEmpty()) {
-            return false;
-          }
-          ApplicationAttemptId attemptId =
-              appAttempts.get(0).getApplicationAttemptId();
-          List<ContainerReport> containers = client.getContainers(attemptId);
-          return (containers.size() == nContainers);
-        } catch (Exception e) {
-          return false;
-        }
-      }
-    }, 10, 60000);
-  }
-
-  @Test
-  @TimelineVersion(2.0f)
-  public void testDistributedShellWithResources() throws Exception {
-    doTestDistributedShellWithResources(false);
-  }
-
-  @Test
-  @TimelineVersion(2.0f)
-  public void testDistributedShellWithResourcesWithLargeContainers()
-      throws Exception {
-    doTestDistributedShellWithResources(true);
-  }
-
-  public void doTestDistributedShellWithResources(boolean largeContainers)
-      throws Exception {
-    Resource clusterResource = yarnCluster.getResourceManager()
-        .getResourceScheduler().getClusterResource();
-    String masterMemoryString = "1 Gi";
-    String containerMemoryString = "512 Mi";
-    long[] memVars = {1024, 512};
-
-    Assume.assumeTrue("The cluster doesn't have enough memory for this test",
-        clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
-    Assume.assumeTrue("The cluster doesn't have enough cores for this test",
-        clusterResource.getVirtualCores() >= 2);
-    if (largeContainers) {
-      memVars[0] = clusterResource.getMemorySize() * 2 / 3;
-      memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
-      masterMemoryString = memVars[0] + "Mi";
-      memVars[1] = clusterResource.getMemorySize() / 3;
-      memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
-      containerMemoryString = String.valueOf(memVars[1]);
-    }
-
-    String[] args = createArguments(
-        "--num_containers",
-        "2",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_resources",
-        "memory=" + masterMemoryString + ",vcores=1",
-        "--container_resources",
-        "memory=" + containerMemoryString + ",vcores=1"
-    );
-
-    LOG.info("Initializing DS Client");
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    Assert.assertTrue(client.init(args));
-    LOG.info("Running DS Client");
-    final AtomicBoolean result = new AtomicBoolean(false);
-    Thread t = new Thread() {
-      public void run() {
-        try {
-          result.set(client.run());
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-    t.start();
-
-    YarnClient yarnClient = YarnClient.createYarnClient();
-    yarnClient.init(new Configuration(yarnCluster.getConfig()));
-    yarnClient.start();
-
-    final AtomicBoolean testFailed = new AtomicBoolean(false);
-    try {
-      GenericTestUtils.waitFor(new Supplier<Boolean>() {
-        @Override
-        public Boolean get() {
-          if (testFailed.get()) {
-            return true;
-          }
-          List<ContainerReport> containers;
-          try {
-            List<ApplicationReport> apps = yarnClient.getApplications();
-            if (apps.isEmpty()) {
-              return false;
-            }
-            ApplicationReport appReport = apps.get(0);
-            ApplicationId appId = appReport.getApplicationId();
-            List<ApplicationAttemptReport> appAttempts =
-                yarnClient.getApplicationAttempts(appId);
-            if (appAttempts.isEmpty()) {
-              return false;
-            }
-            ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
-            ContainerId amContainerId = appAttemptReport.getAMContainerId();
-            if (amContainerId == null) {
-              return false;
-            }
-            ContainerReport report = yarnClient.getContainerReport(
-                amContainerId);
-            Resource masterResource = report.getAllocatedResource();
-            Assert.assertEquals(memVars[0],
-                masterResource.getMemorySize());
-            Assert.assertEquals(1, masterResource.getVirtualCores());
-            containers = yarnClient.getContainers(
-                appAttemptReport.getApplicationAttemptId());
-            if (containers.size() < 2) {
-              return false;
-            }
-            for (ContainerReport container : containers) {
-              if (!container.getContainerId().equals(amContainerId)) {
-                Resource containerResource = container.getAllocatedResource();
-                Assert.assertEquals(memVars[1],
-                    containerResource.getMemorySize());
-                Assert.assertEquals(1, containerResource.getVirtualCores());
-              }
-            }
-            return true;
-          } catch (Exception ex) {
-            LOG.error("Error waiting for expected results", ex);
-            testFailed.set(true);
-          }
-          return false;
-        }
-      }, 10, TEST_TIME_WINDOW_EXPIRE);
-      assertFalse(testFailed.get());
-    } finally {
-      LOG.info("Signaling Client to Stop");
-      client.sendStopSignal();
-      if (yarnClient != null) {
-        LOG.info("Stopping yarnClient service");
-        yarnClient.stop();
-      }
-    }
-  }
-
-  @Test(expected=ResourceNotFoundException.class)
-  public void testDistributedShellAMResourcesWithUnknownResource()
-      throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--master_resources",
-        "unknown-resource=5"
-    );
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-    client.run();
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void testDistributedShellNonExistentQueue()
-      throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls",
-        "--queue",
-        "non-existent-queue"
-    );
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-    client.run();
-  }
-
-  @Test
-  public void testDistributedShellWithSingleFileLocalization()
-      throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "type" : "cat",
-        "--localize_files",
-        "./src/test/resources/a.txt",
-        "--shell_args",
-        "a.txt"
-    );
-
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-    assertTrue("Client exited with an error", client.run());
-  }
-
-  @Test
-  public void testDistributedShellWithMultiFileLocalization()
-      throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "type" : "cat",
-        "--localize_files",
-        "./src/test/resources/a.txt,./src/test/resources/b.txt",
-        "--shell_args",
-        "a.txt b.txt"
-    );
-
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-    assertTrue("Client exited with an error", client.run());
-  }
-
-  @Test(expected=UncheckedIOException.class)
-  public void testDistributedShellWithNonExistentFileLocalization()
-      throws Exception {
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "type" : "cat",
-        "--localize_files",
-        "/non/existing/path/file.txt",
-        "--shell_args",
-        "file.txt"
-    );
-
-    Client client = new Client(new Configuration(yarnCluster.getConfig()));
-    client.init(args);
-    assertTrue(client.run());
-  }
-
-
-  @Test
-  public void testDistributedShellCleanup()
-      throws Exception {
-    String appName = "DistributedShellCleanup";
-    String[] args = createArguments(
-        "--num_containers",
-        "1",
-        "--shell_command",
-        Shell.WINDOWS ? "dir" : "ls"
-    );
-    Configuration config = new Configuration(yarnCluster.getConfig());
-    Client client = new Client(config);
-    try {
-      client.init(args);
-      client.run();
-      ApplicationId appId = client.getAppId();
-      String relativePath =
-          ApplicationMaster.getRelativePath(appName, appId.toString(), "");
-      FileSystem fs1 = FileSystem.get(config);
-      Path path = new Path(fs1.getHomeDirectory(), relativePath);
-
-      GenericTestUtils.waitFor(() -> {
-        try {
-          return !fs1.exists(path);
-        } catch (IOException e) {
-          return false;
-        }
-      }, 10, 60000);
-
-      assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
-    } finally {
-      client.sendStopSignal();
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org