You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-issues@hadoop.apache.org by GitBox <gi...@apache.org> on 2020/12/31 20:29:36 UTC

[GitHub] [hadoop] goiri commented on a change in pull request #2581: YARN-10553. Refactor TestDistributedShell

goiri commented on a change in pull request #2581:
URL: https://github.com/apache/hadoop/pull/2581#discussion_r550685412



##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+import static org.junit.Assert.assertTrue;
+
+public class DistributedShellBaseTest {
+
+  protected final static String APPMASTER_JAR =
+      JarFinder.getJar(ApplicationMaster.class);
+  protected static final int MIN_ALLOCATION_MB = 128;
+  protected static final int TEST_TIME_OUT = 160000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  protected static final int TEST_TIME_WINDOW_EXPIRE =
+      (TEST_TIME_OUT * 90) / 100;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedShellBaseTest.class);
+  private static final int NUM_NMS = 1;
+  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private final String yarnClientTimeout =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+  private final String[] commonArgs = {
+      "--jar",
+      APPMASTER_JAR,
+      "--timeout",
+      yarnClientTimeout,
+      "--appname",
+      ""
+  };
+  @Rule
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public TestName name = new TestName();
+
+  protected MiniYARNCluster yarnCluster;
+  protected YarnConfiguration conf = null;
+  // location of the filesystem timeline writer for timeline service v.2
+  private String timelineV2StorageDir = null;
+
+  protected float getTimelineVersion() {
+    return DEFAULT_TIMELINE_VERSION;
+  }
+
+  public String getTimelineV2StorageDir() {
+    return timelineV2StorageDir;
+  }
+
+  public void setTimelineV2StorageDir() throws Exception {
+    timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupInternal(NUM_NMS, new YarnConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
+
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+  }
+
+  protected void shutdownYarnCluster() {
+    if (yarnCluster != null) {
+      try {
+        yarnCluster.stop();
+      } finally {
+        yarnCluster = null;
+      }
+    }
+  }
+
+  protected void shutdownHdfsCluster() {
+  }
+
+  protected void waitForNMsToRegister() throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+        return (rmContext.getRMNodes().size() >= NUM_NMS);
+      }
+    }, 100, 60000);
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our arguments.
+   *
+   * @param args the first set of the arguments.
+   * @param newArgs the second set of the arguments.
+   * @return a String array consists of {args, newArgs}
+   */
+  protected String[] mergeArgs(String[] args, String[] newArgs) {
+    int length = args.length + newArgs.length;
+    String[] result = new String[length];
+    System.arraycopy(args, 0, result, 0, args.length);
+    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+    return result;
+  }
+
+  protected String[] createArguments(String... args) {
+    String[] res = mergeArgs(commonArgs, args);
+    // set the application name so we can track down which command is running.
+    res[commonArgs.length - 1] = generateAppName();
+    return res;
+  }
+
+  protected void waitForContainersLaunch(YarnClient client,
+      int nContainers) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        try {
+          List<ApplicationReport> apps = client.getApplications();
+          if (apps == null || apps.isEmpty()) {
+            return false;
+          }
+          ApplicationId appId = apps.get(0).getApplicationId();
+          List<ApplicationAttemptReport> appAttempts =
+              client.getApplicationAttempts(appId);
+          if (appAttempts == null || appAttempts.isEmpty()) {
+            return false;
+          }
+          ApplicationAttemptId attemptId =
+              appAttempts.get(0).getApplicationAttemptId();
+          List<ContainerReport> containers = client.getContainers(attemptId);
+          return (containers.size() == nContainers);
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 10, 60000);
+  }
+
+

Review comment:
       Avoid double spaces? A high level javadoc wouldn't hurt either.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+import static org.junit.Assert.assertTrue;
+
+public class DistributedShellBaseTest {
+
+  protected final static String APPMASTER_JAR =
+      JarFinder.getJar(ApplicationMaster.class);
+  protected static final int MIN_ALLOCATION_MB = 128;
+  protected static final int TEST_TIME_OUT = 160000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  protected static final int TEST_TIME_WINDOW_EXPIRE =
+      (TEST_TIME_OUT * 90) / 100;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedShellBaseTest.class);
+  private static final int NUM_NMS = 1;
+  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private final String yarnClientTimeout =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+  private final String[] commonArgs = {
+      "--jar",
+      APPMASTER_JAR,
+      "--timeout",
+      yarnClientTimeout,
+      "--appname",
+      ""
+  };
+  @Rule
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public TestName name = new TestName();
+
+  protected MiniYARNCluster yarnCluster;
+  protected YarnConfiguration conf = null;
+  // location of the filesystem timeline writer for timeline service v.2
+  private String timelineV2StorageDir = null;
+
+  protected float getTimelineVersion() {
+    return DEFAULT_TIMELINE_VERSION;
+  }
+
+  public String getTimelineV2StorageDir() {
+    return timelineV2StorageDir;
+  }
+
+  public void setTimelineV2StorageDir() throws Exception {
+    timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupInternal(NUM_NMS, new YarnConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
+
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+  }
+
+  protected void shutdownYarnCluster() {
+    if (yarnCluster != null) {
+      try {
+        yarnCluster.stop();
+      } finally {
+        yarnCluster = null;
+      }
+    }
+  }
+
+  protected void shutdownHdfsCluster() {
+  }
+
+  protected void waitForNMsToRegister() throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+        return (rmContext.getRMNodes().size() >= NUM_NMS);
+      }
+    }, 100, 60000);
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our arguments.
+   *
+   * @param args the first set of the arguments.
+   * @param newArgs the second set of the arguments.
+   * @return a String array consists of {args, newArgs}
+   */
+  protected String[] mergeArgs(String[] args, String[] newArgs) {
+    int length = args.length + newArgs.length;
+    String[] result = new String[length];
+    System.arraycopy(args, 0, result, 0, args.length);
+    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+    return result;
+  }
+
+  protected String[] createArguments(String... args) {
+    String[] res = mergeArgs(commonArgs, args);
+    // set the application name so we can track down which command is running.
+    res[commonArgs.length - 1] = generateAppName();
+    return res;
+  }
+
+  protected void waitForContainersLaunch(YarnClient client,
+      int nContainers) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {

Review comment:
       Lambda?

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+import static org.junit.Assert.assertTrue;
+
+public class DistributedShellBaseTest {
+
+  protected final static String APPMASTER_JAR =
+      JarFinder.getJar(ApplicationMaster.class);
+  protected static final int MIN_ALLOCATION_MB = 128;
+  protected static final int TEST_TIME_OUT = 160000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  protected static final int TEST_TIME_WINDOW_EXPIRE =
+      (TEST_TIME_OUT * 90) / 100;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedShellBaseTest.class);
+  private static final int NUM_NMS = 1;
+  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private final String yarnClientTimeout =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+  private final String[] commonArgs = {
+      "--jar",
+      APPMASTER_JAR,
+      "--timeout",
+      yarnClientTimeout,
+      "--appname",
+      ""
+  };
+  @Rule
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public TestName name = new TestName();
+
+  protected MiniYARNCluster yarnCluster;
+  protected YarnConfiguration conf = null;
+  // location of the filesystem timeline writer for timeline service v.2
+  private String timelineV2StorageDir = null;
+
+  protected float getTimelineVersion() {
+    return DEFAULT_TIMELINE_VERSION;
+  }
+
+  public String getTimelineV2StorageDir() {
+    return timelineV2StorageDir;
+  }
+
+  public void setTimelineV2StorageDir() throws Exception {
+    timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupInternal(NUM_NMS, new YarnConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
+
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+  }
+
+  protected void shutdownYarnCluster() {
+    if (yarnCluster != null) {
+      try {
+        yarnCluster.stop();
+      } finally {
+        yarnCluster = null;
+      }
+    }
+  }
+
+  protected void shutdownHdfsCluster() {
+  }
+
+  protected void waitForNMsToRegister() throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {

Review comment:
       Use a lambda?

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+import static org.junit.Assert.assertTrue;
+
+public class DistributedShellBaseTest {
+
+  protected final static String APPMASTER_JAR =
+      JarFinder.getJar(ApplicationMaster.class);
+  protected static final int MIN_ALLOCATION_MB = 128;
+  protected static final int TEST_TIME_OUT = 160000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  protected static final int TEST_TIME_WINDOW_EXPIRE =
+      (TEST_TIME_OUT * 90) / 100;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedShellBaseTest.class);
+  private static final int NUM_NMS = 1;
+  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private final String yarnClientTimeout =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+  private final String[] commonArgs = {
+      "--jar",
+      APPMASTER_JAR,
+      "--timeout",
+      yarnClientTimeout,
+      "--appname",
+      ""
+  };
+  @Rule
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public TestName name = new TestName();
+
+  protected MiniYARNCluster yarnCluster;
+  protected YarnConfiguration conf = null;
+  // location of the filesystem timeline writer for timeline service v.2
+  private String timelineV2StorageDir = null;
+
+  protected float getTimelineVersion() {
+    return DEFAULT_TIMELINE_VERSION;
+  }
+
+  public String getTimelineV2StorageDir() {
+    return timelineV2StorageDir;
+  }
+
+  public void setTimelineV2StorageDir() throws Exception {
+    timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupInternal(NUM_NMS, new YarnConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
+
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+  }
+
+  protected void shutdownYarnCluster() {
+    if (yarnCluster != null) {
+      try {
+        yarnCluster.stop();
+      } finally {
+        yarnCluster = null;
+      }
+    }
+  }
+
+  protected void shutdownHdfsCluster() {
+  }
+
+  protected void waitForNMsToRegister() throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+        return (rmContext.getRMNodes().size() >= NUM_NMS);
+      }
+    }, 100, 60000);
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our arguments.
+   *
+   * @param args the first set of the arguments.
+   * @param newArgs the second set of the arguments.
+   * @return a String array consists of {args, newArgs}
+   */
+  protected String[] mergeArgs(String[] args, String[] newArgs) {
+    int length = args.length + newArgs.length;
+    String[] result = new String[length];
+    System.arraycopy(args, 0, result, 0, args.length);
+    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+    return result;
+  }
+
+  protected String[] createArguments(String... args) {
+    String[] res = mergeArgs(commonArgs, args);
+    // set the application name so we can track down which command is running.
+    res[commonArgs.length - 1] = generateAppName();
+    return res;
+  }
+
+  protected void waitForContainersLaunch(YarnClient client,
+      int nContainers) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        try {
+          List<ApplicationReport> apps = client.getApplications();
+          if (apps == null || apps.isEmpty()) {
+            return false;
+          }
+          ApplicationId appId = apps.get(0).getApplicationId();
+          List<ApplicationAttemptReport> appAttempts =
+              client.getApplicationAttempts(appId);
+          if (appAttempts == null || appAttempts.isEmpty()) {
+            return false;
+          }
+          ApplicationAttemptId attemptId =
+              appAttempts.get(0).getApplicationAttemptId();
+          List<ContainerReport> containers = client.getContainers(attemptId);
+          return (containers.size() == nContainers);
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 10, 60000);
+  }
+
+
+  protected void customizeConfiguration(YarnConfiguration config)
+      throws Exception {
+  }
+
+  protected String[] appendFlowArgsForTestDSShell(String[] args,
+      boolean defaultFlow) {
+    return args;
+  }
+
+  protected String[] appendDomainArgsForTestDSShell(String[] args,
+      boolean haveDomain) {
+    if (haveDomain) {
+      String[] domainArgs = {
+          "--domain",
+          "TEST_DOMAIN",
+          "--view_acls",
+          "reader_user reader_group",
+          "--modify_acls",
+          "writer_user writer_group",
+          "--create"
+      };
+      args = mergeArgs(args, domainArgs);
+    }
+    return args;
+  }
+
+  protected void baseTestDSShell(boolean haveDomain, boolean defaultFlow)
+      throws Exception {
+    String[] args = createArguments(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1");
+
+    args = appendDomainArgsForTestDSShell(args, haveDomain);

Review comment:
       This pattern of taking a variable an overwrite it is a little obfuscated; it comes form the original but I would use new vars with full names.

##########
File path: hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
##########
@@ -0,0 +1,496 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+import static org.junit.Assert.assertTrue;
+
+public class DistributedShellBaseTest {
+
+  protected final static String APPMASTER_JAR =
+      JarFinder.getJar(ApplicationMaster.class);
+  protected static final int MIN_ALLOCATION_MB = 128;
+  protected static final int TEST_TIME_OUT = 160000;
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  protected static final int TEST_TIME_WINDOW_EXPIRE =
+      (TEST_TIME_OUT * 90) / 100;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DistributedShellBaseTest.class);
+  private static final int NUM_NMS = 1;
+  private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
+  private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+  // set the timeout of the yarnClient to be 95% of the globalTimeout.
+  private final String yarnClientTimeout =
+      String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+  private final String[] commonArgs = {
+      "--jar",
+      APPMASTER_JAR,
+      "--timeout",
+      yarnClientTimeout,
+      "--appname",
+      ""
+  };
+  @Rule
+  public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+      TimeUnit.MILLISECONDS);
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule
+  public TestName name = new TestName();
+
+  protected MiniYARNCluster yarnCluster;
+  protected YarnConfiguration conf = null;
+  // location of the filesystem timeline writer for timeline service v.2
+  private String timelineV2StorageDir = null;
+
+  protected float getTimelineVersion() {
+    return DEFAULT_TIMELINE_VERSION;
+  }
+
+  public String getTimelineV2StorageDir() {
+    return timelineV2StorageDir;
+  }
+
+  public void setTimelineV2StorageDir() throws Exception {
+    timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupInternal(NUM_NMS, new YarnConfiguration());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileContext fsContext = FileContext.getLocalFSFileContext();
+    fsContext
+        .delete(
+            new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+            true);
+
+    shutdownYarnCluster();
+    shutdownHdfsCluster();
+  }
+
+  protected void shutdownYarnCluster() {
+    if (yarnCluster != null) {
+      try {
+        yarnCluster.stop();
+      } finally {
+        yarnCluster = null;
+      }
+    }
+  }
+
+  protected void shutdownHdfsCluster() {
+  }
+
+  protected void waitForNMsToRegister() throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+        return (rmContext.getRMNodes().size() >= NUM_NMS);
+      }
+    }, 100, 60000);
+  }
+
+  /**
+   * Utility function to merge two String arrays to form a new String array for
+   * our arguments.
+   *
+   * @param args the first set of the arguments.
+   * @param newArgs the second set of the arguments.
+   * @return a String array consists of {args, newArgs}
+   */
+  protected String[] mergeArgs(String[] args, String[] newArgs) {
+    int length = args.length + newArgs.length;
+    String[] result = new String[length];
+    System.arraycopy(args, 0, result, 0, args.length);
+    System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+    return result;
+  }
+
+  protected String[] createArguments(String... args) {
+    String[] res = mergeArgs(commonArgs, args);
+    // set the application name so we can track down which command is running.
+    res[commonArgs.length - 1] = generateAppName();
+    return res;
+  }
+
+  protected void waitForContainersLaunch(YarnClient client,
+      int nContainers) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        try {
+          List<ApplicationReport> apps = client.getApplications();
+          if (apps == null || apps.isEmpty()) {
+            return false;
+          }
+          ApplicationId appId = apps.get(0).getApplicationId();
+          List<ApplicationAttemptReport> appAttempts =
+              client.getApplicationAttempts(appId);
+          if (appAttempts == null || appAttempts.isEmpty()) {
+            return false;
+          }
+          ApplicationAttemptId attemptId =
+              appAttempts.get(0).getApplicationAttemptId();
+          List<ContainerReport> containers = client.getContainers(attemptId);
+          return (containers.size() == nContainers);
+        } catch (Exception e) {
+          return false;
+        }
+      }
+    }, 10, 60000);
+  }
+
+
+  protected void customizeConfiguration(YarnConfiguration config)
+      throws Exception {
+  }
+
+  protected String[] appendFlowArgsForTestDSShell(String[] args,
+      boolean defaultFlow) {
+    return args;
+  }
+
+  protected String[] appendDomainArgsForTestDSShell(String[] args,
+      boolean haveDomain) {
+    if (haveDomain) {
+      String[] domainArgs = {
+          "--domain",
+          "TEST_DOMAIN",
+          "--view_acls",
+          "reader_user reader_group",
+          "--modify_acls",
+          "writer_user writer_group",
+          "--create"
+      };
+      args = mergeArgs(args, domainArgs);
+    }
+    return args;
+  }
+
+  protected void baseTestDSShell(boolean haveDomain, boolean defaultFlow)
+      throws Exception {
+    String[] args = createArguments(
+        "--num_containers",
+        "2",
+        "--shell_command",
+        Shell.WINDOWS ? "dir" : "ls",
+        "--master_memory",
+        "512",
+        "--master_vcores",
+        "2",
+        "--container_memory",
+        "128",
+        "--container_vcores",
+        "1");
+
+    args = appendDomainArgsForTestDSShell(args, haveDomain);
+
+    args = appendFlowArgsForTestDSShell(args, defaultFlow);
+
+    LOG.info("Initializing DS Client");
+    YarnClient yarnClient;
+    final Client client =
+        new Client(new Configuration(yarnCluster.getConfig()));
+    boolean initSuccess = client.init(args);
+    Assert.assertTrue(initSuccess);
+    LOG.info("Running DS Client");
+    final AtomicBoolean result = new AtomicBoolean(false);
+    Thread t = new Thread() {
+      public void run() {
+        try {
+          result.set(client.run());
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    t.start();
+
+    yarnClient = YarnClient.createYarnClient();
+    yarnClient.init(new Configuration(yarnCluster.getConfig()));
+    yarnClient.start();
+
+    boolean verified = false;
+    String errorMessage = "";
+    ApplicationId appId = null;
+    ApplicationReport appReport = null;
+    while (!verified) {
+      List<ApplicationReport> apps = yarnClient.getApplications();

Review comment:
       Extract all this into a function or make it a waitFor?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-issues-help@hadoop.apache.org