You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by iw...@apache.org on 2021/01/08 23:39:37 UTC
[hadoop] branch trunk updated: YARN-10553. Refactor
TestDistributedShell (#2581)
This is an automated email from the ASF dual-hosted git repository.
iwasakims pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 890f2da YARN-10553. Refactor TestDistributedShell (#2581)
890f2da is described below
commit 890f2da624465473a5f401a3bcfc4bbd068289a1
Author: Ahmed Hussein <50...@users.noreply.github.com>
AuthorDate: Fri Jan 8 17:39:21 2021 -0600
YARN-10553. Refactor TestDistributedShell (#2581)
---
.../distributedshell/ApplicationMaster.java | 2 +-
.../yarn/applications/distributedshell/Client.java | 8 +-
.../distributedshell/DistributedShellBaseTest.java | 607 +++++++
.../distributedshell/TestDSSleepingAppMaster.java | 9 +-
.../distributedshell/TestDSTimelineV10.java | 843 +++++++++
.../distributedshell/TestDSTimelineV15.java | 100 ++
.../distributedshell/TestDSTimelineV20.java | 484 +++++
.../TestDSWithMultipleNodeManager.java | 599 ++++---
.../distributedshell/TestDistributedShell.java | 1865 --------------------
9 files changed, 2388 insertions(+), 2129 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index ae14d09..765ca82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -781,7 +781,7 @@ public class ApplicationMaster {
new HelpFormatter().printHelp("ApplicationMaster", opts);
}
- private void cleanup() {
+ protected void cleanup() {
try {
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index 5da4384..b271486 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -1414,21 +1414,19 @@ public class Client {
}
int waitCount = 0;
LOG.info("Waiting for Client to exit loop");
- while (!isRunning.get()) {
+ while (isRunning.get()) {
try {
Thread.sleep(50);
} catch (InterruptedException ie) {
// do nothing
} finally {
- waitCount++;
- if (isRunning.get() || waitCount > 2000) {
+ if (++waitCount > 2000) {
break;
}
}
}
- LOG.info("Stopping yarnClient within the Client");
+ LOG.info("Stopping yarnClient within the DS Client");
yarnClient.stop();
- yarnClient.waitForServiceToStop(clientTimeout);
LOG.info("done stopping Client");
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
new file mode 100644
index 0000000..28cdf8f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/DistributedShellBaseTest.java
@@ -0,0 +1,607 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.net.ServerSocketUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.JarFinder;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.timeline.NameValuePair;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
+
+/**
+ * Base class for testing DistributedShell features.
+ */
+public abstract class DistributedShellBaseTest {
+ protected static final int MIN_ALLOCATION_MB = 128;
+ protected static final int NUM_DATA_NODES = 1;
+ protected static final int TEST_TIME_OUT = 160000;
+ // set the timeout of the yarnClient to be 95% of the globalTimeout.
+ protected static final int TEST_TIME_WINDOW_EXPIRE =
+ (TEST_TIME_OUT * 90) / 100;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DistributedShellBaseTest.class);
+ private static final String APP_MASTER_JAR =
+ JarFinder.getJar(ApplicationMaster.class);
+ private static final int NUM_NMS = 1;
+ // set the timeout of the yarnClient to be 95% of the globalTimeout.
+ private static final String YARN_CLIENT_TIMEOUT =
+ String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+ private static final String[] COMMON_ARGS = {
+ "--jar",
+ APP_MASTER_JAR,
+ "--timeout",
+ YARN_CLIENT_TIMEOUT,
+ "--appname",
+ ""
+ };
+ private static MiniDFSCluster hdfsCluster = null;
+ private static MiniYARNCluster yarnCluster = null;
+ private static String yarnSiteBackupPath = null;
+ private static String yarnSitePath = null;
+ @Rule
+ public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+ TimeUnit.MILLISECONDS);
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+ @Rule
+ public TestName name = new TestName();
+ private Client dsClient;
+ private YarnConfiguration conf = null;
+ // location of the filesystem timeline writer for timeline service v.2
+ private String timelineV2StorageDir = null;
+
+ @BeforeClass
+ public static void setupUnitTests() throws Exception {
+ URL url = Thread.currentThread().getContextClassLoader().getResource(
+ "yarn-site.xml");
+ if (url == null) {
+ throw new RuntimeException(
+ "Could not find 'yarn-site.xml' dummy file in classpath");
+ }
+ // backup the original yarn-site file.
+ yarnSitePath = url.getPath();
+ yarnSiteBackupPath = url.getPath() + "-backup";
+ Files.copy(Paths.get(yarnSitePath),
+ Paths.get(yarnSiteBackupPath),
+ StandardCopyOption.COPY_ATTRIBUTES,
+ StandardCopyOption.REPLACE_EXISTING);
+ }
+
+ @AfterClass
+ public static void tearDownUnitTests() throws Exception {
+ // shutdown the clusters.
+ shutdownYarnCluster();
+ shutdownHdfsCluster();
+ if (yarnSitePath == null || yarnSiteBackupPath == null) {
+ return;
+ }
+ // restore the original yarn-site file.
+ if (Files.exists(Paths.get(yarnSiteBackupPath))) {
+ Files.move(Paths.get(yarnSiteBackupPath), Paths.get(yarnSitePath),
+ StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+
+ /**
+ * Utility function to merge two String arrays to form a new String array for
+ * our arguments.
+ *
+ * @param args the first set of the arguments.
+ * @param newArgs the second set of the arguments.
+ * @return a String array consists of {args, newArgs}
+ */
+ protected static String[] mergeArgs(String[] args, String[] newArgs) {
+ int length = args.length + newArgs.length;
+ String[] result = new String[length];
+ System.arraycopy(args, 0, result, 0, args.length);
+ System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+ return result;
+ }
+
+ protected static String[] createArguments(Supplier<String> testNameProvider,
+ String... args) {
+ String[] res = mergeArgs(COMMON_ARGS, args);
+ // set the application name so we can track down which command is running.
+ res[COMMON_ARGS.length - 1] = testNameProvider.get();
+ return res;
+ }
+
+ protected static String getSleepCommand(int sec) {
+ // Windows doesn't have a sleep command, ping -n does the trick
+ return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul"
+ : "sleep " + sec;
+ }
+
+ protected static String getListCommand() {
+ return Shell.WINDOWS ? "dir" : "ls";
+ }
+
+ protected static String getCatCommand() {
+ return Shell.WINDOWS ? "type" : "cat";
+ }
+
+ protected static void shutdownYarnCluster() {
+ if (yarnCluster != null) {
+ try {
+ yarnCluster.stop();
+ } finally {
+ yarnCluster = null;
+ }
+ }
+ }
+
+ protected static void shutdownHdfsCluster() {
+ if (hdfsCluster != null) {
+ try {
+ hdfsCluster.shutdown();
+ } finally {
+ hdfsCluster = null;
+ }
+ }
+ }
+
+ public String getTimelineV2StorageDir() {
+ return timelineV2StorageDir;
+ }
+
+ public void setTimelineV2StorageDir() throws Exception {
+ timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ setupInternal(NUM_NMS, new YarnConfiguration());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ cleanUpDFSClient();
+ FileContext fsContext = FileContext.getLocalFSFileContext();
+ fsContext
+ .delete(
+ new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+ true);
+ shutdownYarnCluster();
+ shutdownHdfsCluster();
+ }
+
+ protected String[] createArgumentsWithAppName(String... args) {
+ return createArguments(() -> generateAppName(), args);
+ }
+
+ protected void waitForContainersLaunch(YarnClient client, int nContainers,
+ AtomicReference<ApplicationAttemptReport> appAttemptReportRef,
+ AtomicReference<List<ContainerReport>> containersListRef,
+ AtomicReference<ApplicationAttemptId> appAttemptIdRef,
+ AtomicReference<Throwable> thrownErrorRef) throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ List<ApplicationReport> apps = client.getApplications();
+ if (apps == null || apps.isEmpty()) {
+ return false;
+ }
+ ApplicationId appId = apps.get(0).getApplicationId();
+ List<ApplicationAttemptReport> appAttempts =
+ client.getApplicationAttempts(appId);
+ if (appAttempts == null || appAttempts.isEmpty()) {
+ return false;
+ }
+ ApplicationAttemptId attemptId =
+ appAttempts.get(0).getApplicationAttemptId();
+ List<ContainerReport> containers = client.getContainers(attemptId);
+ if (containers == null || containers.size() < nContainers) {
+ return false;
+ }
+ containersListRef.set(containers);
+ appAttemptIdRef.set(attemptId);
+ appAttemptReportRef.set(appAttempts.get(0));
+ } catch (Exception e) {
+ LOG.error("Exception waiting for Containers Launch", e);
+ thrownErrorRef.set(e);
+ }
+ return true;
+ }, 10, TEST_TIME_WINDOW_EXPIRE);
+ }
+
+ protected abstract void customizeConfiguration(YarnConfiguration config)
+ throws Exception;
+
+ protected String[] appendFlowArgsForTestDSShell(String[] args,
+ boolean defaultFlow) {
+ return args;
+ }
+
+ protected String[] appendDomainArgsForTestDSShell(String[] args,
+ boolean haveDomain) {
+ String[] result = args;
+ if (haveDomain) {
+ String[] domainArgs = {
+ "--domain",
+ "TEST_DOMAIN",
+ "--view_acls",
+ "reader_user reader_group",
+ "--modify_acls",
+ "writer_user writer_group",
+ "--create"
+ };
+ result = mergeArgs(args, domainArgs);
+ }
+ return result;
+ }
+
+ protected Client setAndGetDSClient(Configuration config) throws Exception {
+ dsClient = new Client(config);
+ return dsClient;
+ }
+
+ protected Client setAndGetDSClient(String appMasterMainClass,
+ Configuration config) throws Exception {
+ dsClient = new Client(appMasterMainClass, config);
+ return dsClient;
+ }
+
+ protected void baseTestDSShell(boolean haveDomain, boolean defaultFlow)
+ throws Exception {
+ String[] baseArgs = createArgumentsWithAppName(
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1");
+ String[] domainArgs = appendDomainArgsForTestDSShell(baseArgs, haveDomain);
+ String[] args = appendFlowArgsForTestDSShell(domainArgs, defaultFlow);
+
+ LOG.info("Initializing DS Client");
+ YarnClient yarnClient;
+ dsClient = setAndGetDSClient(new Configuration(yarnCluster.getConfig()));
+ boolean initSuccess = dsClient.init(args);
+ Assert.assertTrue(initSuccess);
+ LOG.info("Running DS Client");
+ final AtomicBoolean result = new AtomicBoolean(false);
+ Thread t = new Thread(() -> {
+ try {
+ result.set(dsClient.run());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ t.start();
+
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new Configuration(yarnCluster.getConfig()));
+ yarnClient.start();
+
+ AtomicInteger waitResult = new AtomicInteger(0);
+ AtomicReference<ApplicationId> appIdRef =
+ new AtomicReference<>(null);
+ AtomicReference<ApplicationReport> appReportRef =
+ new AtomicReference<>(null);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ List<ApplicationReport> apps = yarnClient.getApplications();
+ if (apps.size() == 0) {
+ return false;
+ }
+ ApplicationReport appReport = apps.get(0);
+ appReportRef.set(appReport);
+ appIdRef.set(appReport.getApplicationId());
+ if (appReport.getHost().equals("N/A")) {
+ return false;
+ }
+ if (appReport.getRpcPort() == -1) {
+ waitResult.set(1);
+ }
+ if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
+ && appReport.getFinalApplicationStatus() !=
+ FinalApplicationStatus.UNDEFINED) {
+ return true;
+ }
+ } catch (Exception e) {
+ LOG.error("Exception get application from Yarn Client", e);
+ waitResult.set(2);
+ }
+ return waitResult.get() != 0;
+ }, 10, TEST_TIME_WINDOW_EXPIRE);
+ t.join();
+ if (waitResult.get() == 2) {
+ // Exception was raised
+ Assert.fail("Exception in getting application report. Failed");
+ }
+ if (waitResult.get() == 1) {
+ Assert.assertEquals("Failed waiting for expected rpc port to be -1.",
+ -1, appReportRef.get().getRpcPort());
+ }
+ checkTimeline(appIdRef.get(), defaultFlow, haveDomain, appReportRef.get());
+ }
+
+ protected void baseTestDSShell(boolean haveDomain) throws Exception {
+ baseTestDSShell(haveDomain, true);
+ }
+
+ protected void checkTimeline(ApplicationId appId,
+ boolean defaultFlow, boolean haveDomain,
+ ApplicationReport appReport) throws Exception {
+ TimelineDomain domain = null;
+ if (haveDomain) {
+ domain = yarnCluster.getApplicationHistoryServer()
+ .getTimelineStore().getDomain("TEST_DOMAIN");
+ Assert.assertNotNull(domain);
+ Assert.assertEquals("reader_user reader_group", domain.getReaders());
+ Assert.assertEquals("writer_user writer_group", domain.getWriters());
+ }
+ TimelineEntities entitiesAttempts = yarnCluster
+ .getApplicationHistoryServer()
+ .getTimelineStore()
+ .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
+ null, null, null, null, null, null, null, null, null);
+ Assert.assertNotNull(entitiesAttempts);
+ Assert.assertEquals(1, entitiesAttempts.getEntities().size());
+ Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
+ .size());
+ Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
+ ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
+ Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT",
+ entitiesAttempts.getEntities().get(0).getDomainId());
+ String currAttemptEntityId =
+ entitiesAttempts.getEntities().get(0).getEntityId();
+ ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(
+ currAttemptEntityId);
+ NameValuePair primaryFilter = new NameValuePair(
+ ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
+ attemptId.getApplicationId().toString());
+ TimelineEntities entities = yarnCluster
+ .getApplicationHistoryServer()
+ .getTimelineStore()
+ .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
+ null, null, null, null, primaryFilter, null, null, null);
+ Assert.assertNotNull(entities);
+ Assert.assertEquals(2, entities.getEntities().size());
+ Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
+ ApplicationMaster.DSEntity.DS_CONTAINER.toString());
+
+ String entityId = entities.getEntities().get(0).getEntityId();
+ TimelineEntity entity =
+ yarnCluster.getApplicationHistoryServer().getTimelineStore()
+ .getEntity(entityId,
+ ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null);
+ Assert.assertNotNull(entity);
+ Assert.assertEquals(entityId, entity.getEntityId());
+ Assert.assertEquals(haveDomain ? domain.getId() : "DEFAULT",
+ entities.getEntities().get(0).getDomainId());
+ }
+
+ protected String[] createArgsWithPostFix(int index, String... args) {
+ String[] res = mergeArgs(COMMON_ARGS, args);
+ // set the application name so we can track down which command is running.
+ res[COMMON_ARGS.length - 1] = generateAppName(String.format("%03d",
+ index));
+ return res;
+ }
+
+ protected String generateAppName() {
+ return generateAppName(null);
+ }
+
+ protected String generateAppName(String postFix) {
+ return name.getMethodName().replaceFirst("test", "")
+ .concat(postFix == null ? "" : "-" + postFix);
+ }
+
+ protected void setUpHDFSCluster() throws IOException {
+ if (hdfsCluster == null) {
+ HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+ hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+ .numDataNodes(NUM_DATA_NODES).build();
+ hdfsCluster.waitActive();
+ }
+ }
+
+ protected void setUpYarnCluster(int numNodeManagers,
+ YarnConfiguration yarnConfig) throws Exception {
+ if (yarnCluster != null) {
+ return;
+ }
+ yarnCluster =
+ new MiniYARNCluster(getClass().getSimpleName(), 1, numNodeManagers,
+ 1, 1);
+ yarnCluster.init(yarnConfig);
+ yarnCluster.start();
+ // wait for the node managers to register.
+ waitForNMsToRegister();
+ conf.set(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ MiniYARNCluster.getHostname() + ":"
+ + yarnCluster.getApplicationHistoryServer().getPort());
+ Configuration yarnClusterConfig = yarnCluster.getConfig();
+ yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ new File(yarnSitePath).getParent());
+ // write the document to a buffer (not directly to the file, as that
+ // can cause the file being written to get read -which will then fail.
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ yarnClusterConfig.writeXml(bytesOut);
+ bytesOut.close();
+ // write the bytes to the file in the classpath
+ OutputStream os = new FileOutputStream(yarnSitePath);
+ os.write(bytesOut.toByteArray());
+ os.close();
+ }
+
+ protected void setupInternal(int numNodeManagers,
+ YarnConfiguration yarnConfig) throws Exception {
+ LOG.info("========== Setting UP UnitTest {}#{} ==========",
+ getClass().getCanonicalName(), name.getMethodName());
+ LOG.info("Starting up YARN cluster. Timeline version {}",
+ getTimelineVersion());
+ conf = yarnConfig;
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ MIN_ALLOCATION_MB);
+ // reduce the tearDown waiting time
+ conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
+ conf.set("yarn.log.dir", "target");
+ conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+ // mark if we need to launch the v1 timeline server
+ // disable aux-service based timeline aggregators
+ conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
+ conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
+
+ conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
+ conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+ conf.set("mapreduce.jobhistory.address",
+ "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
+ // Enable ContainersMonitorImpl
+ conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+ LinuxResourceCalculatorPlugin.class.getName());
+ conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
+ ProcfsBasedProcessTree.class.getName());
+ conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+ conf.setBoolean(
+ YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING, true);
+ conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+ true);
+ conf.setBoolean(
+ YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+ conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
+ 10);
+ conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
+ YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
+ // ATS version specific settings
+ conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
+ getTimelineVersion());
+ // setup the configuration of relevant for each TimelineService version.
+ customizeConfiguration(conf);
+ // setup the yarn cluster.
+ setUpYarnCluster(numNodeManagers, conf);
+ }
+
+ protected NodeManager getNodeManager(int index) {
+ return yarnCluster.getNodeManager(index);
+ }
+
+ protected MiniYARNCluster getYarnCluster() {
+ return yarnCluster;
+ }
+
+ protected void setConfiguration(String key, String value) {
+ conf.set(key, value);
+ }
+
+ protected Configuration getYarnClusterConfiguration() {
+ return yarnCluster.getConfig();
+ }
+
+ protected Configuration getConfiguration() {
+ return conf;
+ }
+
+ protected ResourceManager getResourceManager() {
+ return yarnCluster.getResourceManager();
+ }
+
+ protected ResourceManager getResourceManager(int index) {
+ return yarnCluster.getResourceManager(index);
+ }
+
+ protected Client getDSClient() {
+ return dsClient;
+ }
+
+ protected void resetDSClient() {
+ dsClient = null;
+ }
+
+ protected abstract float getTimelineVersion();
+
+ protected void cleanUpDFSClient() {
+ if (getDSClient() != null) {
+ getDSClient().sendStopSignal();
+ resetDSClient();
+ }
+ }
+
+ private void waitForNMsToRegister() throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+ return (rmContext.getRMNodes().size() >= NUM_NMS);
+ }, 100, 60000);
+ }
+
+ protected MiniDFSCluster getHDFSCluster() {
+ return hdfsCluster;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java
index 25975bf..ae25ece 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSSleepingAppMaster.java
@@ -18,11 +18,10 @@
package org.apache.hadoop.yarn.applications.distributedshell;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class TestDSSleepingAppMaster extends ApplicationMaster{
+public class TestDSSleepingAppMaster extends ApplicationMaster {
private static final Logger LOG = LoggerFactory
.getLogger(TestDSSleepingAppMaster.class);
@@ -30,8 +29,8 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{
public static void main(String[] args) {
boolean result = false;
+ TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
try {
- TestDSSleepingAppMaster appMaster = new TestDSSleepingAppMaster();
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
@@ -48,6 +47,10 @@ public class TestDSSleepingAppMaster extends ApplicationMaster{
result = appMaster.finish();
} catch (Throwable t) {
System.exit(1);
+ } finally {
+ if (appMaster != null) {
+ appMaster.cleanup();
+ }
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java
new file mode 100644
index 0000000..15dc1cb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV10.java
@@ -0,0 +1,843 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.cli.MissingArgumentException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
+import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
+import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
+import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests implementations for distributed shell on TimeLineV1.
+ */
+public class TestDSTimelineV10 extends DistributedShellBaseTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDSTimelineV10.class);
+
+ @Override
+ protected float getTimelineVersion() {
+ return 1.0f;
+ }
+
+ @Override
+ protected void cleanUpDFSClient() {
+
+ }
+
+ @Test
+ public void testDSShellWithDomain() throws Exception {
+ baseTestDSShell(true);
+ }
+
+ @Test
+ public void testDSShellWithoutDomain() throws Exception {
+ baseTestDSShell(false);
+ }
+
+ @Test
+ public void testDSRestartWithPreviousRunningContainers() throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getSleepCommand(8),
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128",
+ "--keep_containers_across_application_attempts"
+ );
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(TestDSFailedAppMaster.class.getName(),
+ new Configuration(getYarnClusterConfiguration()));
+
+ getDSClient().init(args);
+
+ LOG.info("Running DS Client");
+ boolean result = getDSClient().run();
+ LOG.info("Client run completed. Result={}", result);
+ // application should succeed
+ Assert.assertTrue(result);
+ }
+
+ /*
+ * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
+ * Set attempt_failures_validity_interval as 2.5 seconds. It will check
+ * how many attempt failures for previous 2.5 seconds.
+ * The application is expected to be successful.
+ */
+ @Test
+ public void testDSAttemptFailuresValidityIntervalSuccess() throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getSleepCommand(8),
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128",
+ "--attempt_failures_validity_interval",
+ "2500"
+ );
+
+ LOG.info("Initializing DS Client");
+ Configuration config = getYarnClusterConfiguration();
+ config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ setAndGetDSClient(TestDSSleepingAppMaster.class.getName(),
+ new Configuration(config));
+
+ getDSClient().init(args);
+
+ LOG.info("Running DS Client");
+ boolean result = getDSClient().run();
+
+ LOG.info("Client run completed. Result=" + result);
+ // application should succeed
+ Assert.assertTrue(result);
+ }
+
+ /*
+ * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
+ * Set attempt_failures_validity_interval as 15 seconds. It will check
+ * how many attempt failure for previous 15 seconds.
+ * The application is expected to be fail.
+ */
+ @Test
+ public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getSleepCommand(8),
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128",
+ "--attempt_failures_validity_interval",
+ "15000"
+ );
+
+ LOG.info("Initializing DS Client");
+ Configuration config = getYarnClusterConfiguration();
+ config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ setAndGetDSClient(TestDSSleepingAppMaster.class.getName(),
+ new Configuration(config));
+
+ getDSClient().init(args);
+
+ LOG.info("Running DS Client");
+ boolean result = getDSClient().run();
+
+ LOG.info("Client run completed. Result=" + result);
+ // application should be failed
+ Assert.assertFalse(result);
+ }
+
+ @Test
+ public void testDSShellWithCustomLogPropertyFile() throws Exception {
+ final File basedir = getBaseDirForTest();
+ final File tmpDir = new File(basedir, "tmpDir");
+ tmpDir.mkdirs();
+ final File customLogProperty = new File(tmpDir, "custom_log4j.properties");
+ if (customLogProperty.exists()) {
+ customLogProperty.delete();
+ }
+ if (!customLogProperty.createNewFile()) {
+ Assert.fail("Can not create custom log4j property file.");
+ }
+ PrintWriter fileWriter = new PrintWriter(customLogProperty);
+ // set the output to DEBUG level
+ fileWriter.write("log4j.rootLogger=debug,stdout");
+ fileWriter.close();
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "3",
+ "--shell_command",
+ "echo",
+ "--shell_args",
+ "HADOOP",
+ "--log_properties",
+ customLogProperty.getAbsolutePath(),
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1"
+ );
+
+ // Before run the DS, the default the log level is INFO
+ final Logger LOG_Client =
+ LoggerFactory.getLogger(Client.class);
+ Assert.assertTrue(LOG_Client.isInfoEnabled());
+ Assert.assertFalse(LOG_Client.isDebugEnabled());
+ final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class);
+ Assert.assertTrue(LOG_AM.isInfoEnabled());
+ Assert.assertFalse(LOG_AM.isDebugEnabled());
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ boolean initSuccess = getDSClient().init(args);
+ Assert.assertTrue(initSuccess);
+
+ LOG.info("Running DS Client");
+ boolean result = getDSClient().run();
+ LOG.info("Client run completed. Result=" + result);
+ Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10);
+ //After DS is finished, the log level should be DEBUG
+ Assert.assertTrue(LOG_Client.isInfoEnabled());
+ Assert.assertTrue(LOG_Client.isDebugEnabled());
+ Assert.assertTrue(LOG_AM.isInfoEnabled());
+ Assert.assertTrue(LOG_AM.isDebugEnabled());
+ }
+
+ @Test
+ public void testSpecifyingLogAggregationContext() throws Exception {
+ String regex = ".*(foo|bar)\\d";
+ String[] args = createArgumentsWithAppName(
+ "--shell_command",
+ "echo",
+ "--rolling_log_pattern",
+ regex
+ );
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ Assert.assertTrue(getDSClient().init(args));
+
+ ApplicationSubmissionContext context =
+ Records.newRecord(ApplicationSubmissionContext.class);
+ getDSClient().specifyLogAggregationContext(context);
+ LogAggregationContext logContext = context.getLogAggregationContext();
+ assertEquals(logContext.getRolledLogsIncludePattern(), regex);
+ assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
+ }
+
+ @Test
+ public void testDSShellWithMultipleArgs() throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "4",
+ "--shell_command",
+ "echo",
+ "--shell_args",
+ "HADOOP YARN MAPREDUCE HDFS",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1"
+ );
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ boolean initSuccess = getDSClient().init(args);
+ Assert.assertTrue(initSuccess);
+ LOG.info("Running DS Client");
+
+ boolean result = getDSClient().run();
+ LOG.info("Client run completed. Result=" + result);
+ List<String> expectedContent = new ArrayList<>();
+ expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
+ verifyContainerLog(4, expectedContent, false, "");
+ }
+
+ @Test
+ public void testDSShellWithShellScript() throws Exception {
+ final File basedir = getBaseDirForTest();
+ final File tmpDir = new File(basedir, "tmpDir");
+ tmpDir.mkdirs();
+ final File customShellScript = new File(tmpDir, "custom_script.sh");
+ if (customShellScript.exists()) {
+ customShellScript.delete();
+ }
+ if (!customShellScript.createNewFile()) {
+ Assert.fail("Can not create custom shell script file.");
+ }
+ PrintWriter fileWriter = new PrintWriter(customShellScript);
+ // set the output to DEBUG level
+ fileWriter.write("echo testDSShellWithShellScript");
+ fileWriter.close();
+ LOG.info(customShellScript.getAbsolutePath());
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_script",
+ customShellScript.getAbsolutePath(),
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1"
+ );
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ Assert.assertTrue(getDSClient().init(args));
+ LOG.info("Running DS Client");
+ assertTrue(getDSClient().run());
+ List<String> expectedContent = new ArrayList<>();
+ expectedContent.add("testDSShellWithShellScript");
+ verifyContainerLog(1, expectedContent, false, "");
+ }
+
+ @Test
+ public void testDSShellWithInvalidArgs() throws Exception {
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ int appNameCounter = 0;
+ LOG.info("Initializing DS Client with no args");
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "No args",
+ () -> getDSClient().init(new String[]{}));
+
+ LOG.info("Initializing DS Client with no jar file");
+ String[] noJarArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ );
+ String[] argsNoJar = Arrays.copyOfRange(noJarArgs, 2, noJarArgs.length);
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "No jar",
+ () -> getDSClient().init(argsNoJar));
+
+ LOG.info("Initializing DS Client with no shell command");
+ String[] noShellCmdArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "2",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ );
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "No shell command",
+ () -> getDSClient().init(noShellCmdArgs));
+
+ LOG.info("Initializing DS Client with invalid no. of containers");
+
+ String[] numContainersArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "-1",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ );
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "Invalid no. of containers",
+ () -> getDSClient().init(numContainersArgs));
+
+ LOG.info("Initializing DS Client with invalid no. of vcores");
+
+ String[] vCoresArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "-2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1"
+ );
+ getDSClient().init(vCoresArgs);
+
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "Invalid virtual cores specified",
+ () -> {
+ getDSClient().init(vCoresArgs);
+ getDSClient().run();
+ });
+
+ LOG.info("Initializing DS Client with --shell_command and --shell_script");
+
+ String[] scriptAndCmdArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--shell_script",
+ "test.sh"
+ );
+
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "Can not specify shell_command option and shell_script option at "
+ + "the same time",
+ () -> getDSClient().init(scriptAndCmdArgs));
+
+ LOG.info(
+ "Initializing DS Client without --shell_command and --shell_script");
+
+ String[] noShellCmdNoScriptArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "2",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1"
+ );
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "No shell command or shell script specified "
+ + "to be executed by application master",
+ () -> getDSClient().init(noShellCmdNoScriptArgs));
+
+ LOG.info("Initializing DS Client with invalid container_type argument");
+ String[] invalidTypeArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "2",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--shell_command",
+ "date",
+ "--container_type",
+ "UNSUPPORTED_TYPE"
+ );
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ "Invalid container_type: UNSUPPORTED_TYPE",
+ () -> getDSClient().init(invalidTypeArgs));
+
+ String[] invalidMemArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getListCommand(),
+ "--master_resources",
+ "memory-mb=invalid"
+ );
+ LambdaTestUtils.intercept(IllegalArgumentException.class,
+ () -> getDSClient().init(invalidMemArgs));
+
+ String[] invalidMasterResArgs = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getListCommand(),
+ "--master_resources"
+ );
+ LambdaTestUtils.intercept(MissingArgumentException.class,
+ () -> getDSClient().init(invalidMasterResArgs));
+ }
+
+ @Test
+ public void testDSTimelineClientWithConnectionRefuse() throws Exception {
+ ApplicationMaster am = new ApplicationMaster();
+ final AtomicReference<TimelineWriter> spyTimelineWriterRef =
+ new AtomicReference<>(null);
+ TimelineClientImpl client = new TimelineClientImpl() {
+ @Override
+ protected TimelineWriter createTimelineWriter(Configuration conf,
+ UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
+ URI resURI) throws IOException {
+ TimelineWriter timelineWriter =
+ new DirectTimelineWriter(authUgi, client, resURI);
+ spyTimelineWriterRef.set(spy(timelineWriter));
+ return spyTimelineWriterRef.get();
+ }
+ };
+ client.init(getConfiguration());
+ client.start();
+ TestTimelineClient.mockEntityClientResponse(spyTimelineWriterRef.get(),
+ null, false, true);
+ try {
+ UserGroupInformation ugi = mock(UserGroupInformation.class);
+ when(ugi.getShortUserName()).thenReturn("user1");
+ // verify no ClientHandlerException get thrown out.
+ am.publishContainerEndEvent(client, ContainerStatus.newInstance(
+ BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
+ 1), "domainId", ugi);
+ } finally {
+ client.stop();
+ }
+ }
+
+ @Test
+ public void testContainerLaunchFailureHandling() throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ );
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(ContainerLaunchFailAppMaster.class.getName(),
+ new Configuration(getYarnClusterConfiguration()));
+ Assert.assertTrue(getDSClient().init(args));
+ LOG.info("Running DS Client");
+ Assert.assertFalse(getDSClient().run());
+ }
+
+ @Test
+ public void testDebugFlag() throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--debug"
+ );
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ Assert.assertTrue(getDSClient().init(args));
+ LOG.info("Running DS Client");
+ Assert.assertTrue(getDSClient().run());
+ }
+
+ private int verifyContainerLog(int containerNum,
+ List<String> expectedContent, boolean count, String expectedWord) {
+ File logFolder =
+ new File(getNodeManager(0).getConfig()
+ .get(YarnConfiguration.NM_LOG_DIRS,
+ YarnConfiguration.DEFAULT_NM_LOG_DIRS));
+
+ File[] listOfFiles = logFolder.listFiles();
+ Assert.assertNotNull(listOfFiles);
+ int currentContainerLogFileIndex = -1;
+ for (int i = listOfFiles.length - 1; i >= 0; i--) {
+ if (listOfFiles[i].listFiles().length == containerNum + 1) {
+ currentContainerLogFileIndex = i;
+ break;
+ }
+ }
+ Assert.assertTrue(currentContainerLogFileIndex != -1);
+ File[] containerFiles =
+ listOfFiles[currentContainerLogFileIndex].listFiles();
+
+ int numOfWords = 0;
+ for (File containerFile : containerFiles) {
+ if (containerFile == null) {
+ continue;
+ }
+ for (File output : containerFile.listFiles()) {
+ if (output.getName().trim().contains("stdout")) {
+ List<String> stdOutContent = new ArrayList<>();
+ try (BufferedReader br = new BufferedReader(new FileReader(output))) {
+ String sCurrentLine;
+
+ int numOfline = 0;
+ while ((sCurrentLine = br.readLine()) != null) {
+ if (count) {
+ if (sCurrentLine.contains(expectedWord)) {
+ numOfWords++;
+ }
+ } else if (output.getName().trim().equals("stdout")) {
+ if (!Shell.WINDOWS) {
+ Assert.assertEquals("The current is" + sCurrentLine,
+ expectedContent.get(numOfline), sCurrentLine.trim());
+ numOfline++;
+ } else {
+ stdOutContent.add(sCurrentLine.trim());
+ }
+ }
+ }
+ /* By executing bat script using cmd /c,
+ * it will output all contents from bat script first
+ * It is hard for us to do check line by line
+ * Simply check whether output from bat file contains
+ * all the expected messages
+ */
+ if (Shell.WINDOWS && !count
+ && output.getName().trim().equals("stdout")) {
+ Assert.assertTrue(stdOutContent.containsAll(expectedContent));
+ }
+ } catch (IOException e) {
+ LOG.error("Exception reading the buffer", e);
+ }
+ }
+ }
+ }
+ return numOfWords;
+ }
+
+ @Test
+ public void testDistributedShellResourceProfiles() throws Exception {
+ int appNameCounter = 0;
+ String[][] args = {
+ createArgsWithPostFix(appNameCounter++,
+ "--num_containers", "1", "--shell_command",
+ getListCommand(), "--container_resource_profile",
+ "maximum"),
+ createArgsWithPostFix(appNameCounter++,
+ "--num_containers", "1", "--shell_command",
+ getListCommand(), "--master_resource_profile",
+ "default"),
+ createArgsWithPostFix(appNameCounter++,
+ "--num_containers", "1", "--shell_command",
+ getListCommand(), "--master_resource_profile",
+ "default", "--container_resource_profile", "maximum"),
+ };
+
+ for (int i = 0; i < args.length; ++i) {
+ LOG.info("Initializing DS Client[{}]", i);
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ Assert.assertTrue(getDSClient().init(args[i]));
+ LOG.info("Running DS Client[{}]", i);
+ LambdaTestUtils.intercept(Exception.class,
+ () -> getDSClient().run());
+ }
+ }
+
+ @Test
+ public void testDSShellWithOpportunisticContainers() throws Exception {
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "2",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--shell_command",
+ "date",
+ "--container_type",
+ "OPPORTUNISTIC"
+ );
+ assertTrue(getDSClient().init(args));
+ assertTrue(getDSClient().run());
+ }
+
+ @Test(expected = ResourceNotFoundException.class)
+ public void testDistributedShellAMResourcesWithUnknownResource()
+ throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getListCommand(),
+ "--master_resources",
+ "unknown-resource=5"
+ );
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ assertTrue(getDSClient().init(args));
+ getDSClient().run();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDistributedShellNonExistentQueue()
+ throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getListCommand(),
+ "--queue",
+ "non-existent-queue"
+ );
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ assertTrue(getDSClient().init(args));
+ getDSClient().run();
+ }
+
+ @Test
+ public void testDistributedShellWithSingleFileLocalization()
+ throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getCatCommand(),
+ "--localize_files",
+ "./src/test/resources/a.txt",
+ "--shell_args",
+ "a.txt"
+ );
+
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ assertTrue(getDSClient().init(args));
+ assertTrue("Client exited with an error", getDSClient().run());
+ }
+
+ @Test
+ public void testDistributedShellWithMultiFileLocalization()
+ throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getCatCommand(),
+ "--localize_files",
+ "./src/test/resources/a.txt,./src/test/resources/b.txt",
+ "--shell_args",
+ "a.txt b.txt"
+ );
+
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ assertTrue(getDSClient().init(args));
+ assertTrue("Client exited with an error", getDSClient().run());
+ }
+
+ @Test(expected = UncheckedIOException.class)
+ public void testDistributedShellWithNonExistentFileLocalization()
+ throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getCatCommand(),
+ "--localize_files",
+ "/non/existing/path/file.txt",
+ "--shell_args",
+ "file.txt"
+ );
+
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ assertTrue(getDSClient().init(args));
+ assertTrue(getDSClient().run());
+ }
+
+ @Test
+ public void testDistributedShellCleanup()
+ throws Exception {
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "1",
+ "--shell_command",
+ getListCommand()
+ );
+ Configuration config = new Configuration(getYarnClusterConfiguration());
+ setAndGetDSClient(config);
+
+ assertTrue(getDSClient().init(args));
+ assertTrue(getDSClient().run());
+ ApplicationId appId = getDSClient().getAppId();
+ String relativePath =
+ ApplicationMaster.getRelativePath(generateAppName(),
+ appId.toString(), "");
+ FileSystem fs1 = FileSystem.get(config);
+ Path path = new Path(fs1.getHomeDirectory(), relativePath);
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return !fs1.exists(path);
+ } catch (IOException e) {
+ return false;
+ }
+ }, 10, 60000);
+
+ assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
+ }
+
+ @Override
+ protected void customizeConfiguration(
+ YarnConfiguration config) throws Exception {
+ config.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
+ }
+
+ private static File getBaseDirForTest() {
+ return new File("target", TestDSTimelineV10.class.getName());
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java
new file mode 100644
index 0000000..634bac4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV15.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
+
+/**
+ * Unit tests implementations for distributed shell on TimeLineV1.5.
+ */
+public class TestDSTimelineV15 extends DistributedShellBaseTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDSTimelineV15.class);
+
+ @Override
+ protected float getTimelineVersion() {
+ return 1.5f;
+ }
+
+ @Override
+ protected void customizeConfiguration(
+ YarnConfiguration config) throws Exception {
+ setUpHDFSCluster();
+ PluginStoreTestUtils.prepareFileSystemForPluginStore(
+ getHDFSCluster().getFileSystem());
+ PluginStoreTestUtils.prepareConfiguration(config, getHDFSCluster());
+ config.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
+ DistributedShellTimelinePlugin.class.getName());
+ }
+
+ @Override
+ protected void checkTimeline(ApplicationId appId,
+ boolean defaultFlow, boolean haveDomain,
+ ApplicationReport appReport) throws Exception {
+ long scanInterval = getConfiguration().getLong(
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
+ YarnConfiguration
+ .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
+ );
+ Path doneDir = new Path(
+ YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
+ );
+ // Wait till the data is moved to done dir, or timeout and fail
+ AtomicReference<Exception> exceptionRef = new AtomicReference<>(null);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ RemoteIterator<FileStatus> iterApps =
+ getHDFSCluster().getFileSystem().listStatusIterator(doneDir);
+ return (iterApps.hasNext());
+ } catch (Exception e) {
+ exceptionRef.set(e);
+ LOG.error("Exception listing Done Dir", e);
+ return true;
+ }
+ }, scanInterval * 2, TEST_TIME_WINDOW_EXPIRE);
+ Assert.assertNull("Exception in getting listing status",
+ exceptionRef.get());
+ super.checkTimeline(appId, defaultFlow, haveDomain, appReport);
+ }
+
+ @Test
+ public void testDSShellWithDomain() throws Exception {
+ baseTestDSShell(true);
+ }
+
+ @Test
+ public void testDSShellWithoutDomain() throws Exception {
+ baseTestDSShell(false);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java
new file mode 100644
index 0000000..caf9d3b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSTimelineV20.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.applications.distributedshell;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerReport;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+/**
+ * Unit tests implementations for distributed shell on TimeLineV2.
+ */
+public class TestDSTimelineV20 extends DistributedShellBaseTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDSTimelineV20.class);
+ private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
+
+ @Override
+ protected float getTimelineVersion() {
+ return 2.0f;
+ }
+
+ @Override
+ protected void customizeConfiguration(
+ YarnConfiguration config) throws Exception {
+ // disable v1 timeline server since we no longer have a server here
+ // enable aux-service based timeline aggregators
+ config.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
+ config.set(YarnConfiguration.NM_AUX_SERVICES + "." +
+ TIMELINE_AUX_SERVICE_NAME + ".class",
+ PerNodeTimelineCollectorsAuxService.class.getName());
+ config.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+ FileSystemTimelineWriterImpl.class,
+ org.apache.hadoop.yarn.server.timelineservice.storage.
+ TimelineWriter.class);
+ setTimelineV2StorageDir();
+ // set the file system timeline writer storage directory
+ config.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
+ getTimelineV2StorageDir());
+ }
+
+ @Test
+ public void testDSShellWithEnforceExecutionType() throws Exception {
+ YarnClient yarnClient = null;
+ AtomicReference<Throwable> thrownError = new AtomicReference<>(null);
+ AtomicReference<List<ContainerReport>> containersListRef =
+ new AtomicReference<>(null);
+ AtomicReference<ApplicationAttemptId> appAttemptIdRef =
+ new AtomicReference<>(null);
+ AtomicReference<ApplicationAttemptReport> appAttemptReportRef =
+ new AtomicReference<>(null);
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "2",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--shell_command",
+ getListCommand(),
+ "--container_type",
+ "OPPORTUNISTIC",
+ "--enforce_execution_type"
+ );
+ try {
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ getDSClient().init(args);
+ Thread dsClientRunner = new Thread(() -> {
+ try {
+ getDSClient().run();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ dsClientRunner.start();
+
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new Configuration(getYarnClusterConfiguration()));
+ yarnClient.start();
+
+ // expecting three containers including the AM container.
+ waitForContainersLaunch(yarnClient, 3, appAttemptReportRef,
+ containersListRef, appAttemptIdRef, thrownError);
+ if (thrownError.get() != null) {
+ Assert.fail(thrownError.get().getMessage());
+ }
+ ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId();
+ for (ContainerReport container : containersListRef.get()) {
+ if (!container.getContainerId().equals(amContainerId)) {
+ Assert.assertEquals(container.getExecutionType(),
+ ExecutionType.OPPORTUNISTIC);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Job execution with enforce execution type failed.", e);
+ Assert.fail("Exception. " + e.getMessage());
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testDistributedShellWithResources() throws Exception {
+ doTestDistributedShellWithResources(false);
+ }
+
+ @Test
+ public void testDistributedShellWithResourcesWithLargeContainers()
+ throws Exception {
+ doTestDistributedShellWithResources(true);
+ }
+
+ private void doTestDistributedShellWithResources(boolean largeContainers)
+ throws Exception {
+ AtomicReference<Throwable> thrownExceptionRef =
+ new AtomicReference<>(null);
+ AtomicReference<List<ContainerReport>> containersListRef =
+ new AtomicReference<>(null);
+ AtomicReference<ApplicationAttemptId> appAttemptIdRef =
+ new AtomicReference<>(null);
+ AtomicReference<ApplicationAttemptReport> appAttemptReportRef =
+ new AtomicReference<>(null);
+ Resource clusterResource = getYarnCluster().getResourceManager()
+ .getResourceScheduler().getClusterResource();
+ String masterMemoryString = "1 Gi";
+ String containerMemoryString = "512 Mi";
+ long[] memVars = {1024, 512};
+ YarnClient yarnClient = null;
+ Assume.assumeTrue("The cluster doesn't have enough memory for this test",
+ clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
+ Assume.assumeTrue("The cluster doesn't have enough cores for this test",
+ clusterResource.getVirtualCores() >= 2);
+ if (largeContainers) {
+ memVars[0] = clusterResource.getMemorySize() * 2 / 3;
+ memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
+ masterMemoryString = memVars[0] + "Mi";
+ memVars[1] = clusterResource.getMemorySize() / 3;
+ memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
+ containerMemoryString = String.valueOf(memVars[1]);
+ }
+
+ String[] args = createArgumentsWithAppName(
+ "--num_containers",
+ "2",
+ "--shell_command",
+ getListCommand(),
+ "--master_resources",
+ "memory=" + masterMemoryString + ",vcores=1",
+ "--container_resources",
+ "memory=" + containerMemoryString + ",vcores=1"
+ );
+
+ LOG.info("Initializing DS Client");
+ setAndGetDSClient(new Configuration(getYarnClusterConfiguration()));
+ Assert.assertTrue(getDSClient().init(args));
+ LOG.info("Running DS Client");
+ final AtomicBoolean result = new AtomicBoolean(false);
+ Thread dsClientRunner = new Thread(() -> {
+ try {
+ result.set(getDSClient().run());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ dsClientRunner.start();
+ try {
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(new Configuration(getYarnClusterConfiguration()));
+ yarnClient.start();
+ // expecting two containers.
+ waitForContainersLaunch(yarnClient, 2, appAttemptReportRef,
+ containersListRef, appAttemptIdRef, thrownExceptionRef);
+ if (thrownExceptionRef.get() != null) {
+ Assert.fail(thrownExceptionRef.get().getMessage());
+ }
+ ContainerId amContainerId = appAttemptReportRef.get().getAMContainerId();
+ ContainerReport report = yarnClient.getContainerReport(amContainerId);
+ Resource masterResource = report.getAllocatedResource();
+ Assert.assertEquals(memVars[0], masterResource.getMemorySize());
+ Assert.assertEquals(1, masterResource.getVirtualCores());
+ for (ContainerReport container : containersListRef.get()) {
+ if (!container.getContainerId().equals(amContainerId)) {
+ Resource containerResource = container.getAllocatedResource();
+ Assert.assertEquals(memVars[1],
+ containerResource.getMemorySize());
+ Assert.assertEquals(1, containerResource.getVirtualCores());
+ }
+ }
+ } finally {
+ LOG.info("Signaling Client to Stop");
+ if (yarnClient != null) {
+ LOG.info("Stopping yarnClient service");
+ yarnClient.stop();
+ }
+ }
+ }
+
+ @Test
+ public void testDSShellWithoutDomain() throws Exception {
+ baseTestDSShell(false);
+ }
+
+ @Test
+ public void testDSShellWithoutDomainDefaultFlow() throws Exception {
+ baseTestDSShell(false, true);
+ }
+
+ @Test
+ public void testDSShellWithoutDomainCustomizedFlow() throws Exception {
+ baseTestDSShell(false, false);
+ }
+
+ @Override
+ protected String[] appendFlowArgsForTestDSShell(String[] args,
+ boolean defaultFlow) {
+ if (!defaultFlow) {
+ String[] flowArgs = {
+ "--flow_name",
+ "test_flow_name",
+ "--flow_version",
+ "test_flow_version",
+ "--flow_run_id",
+ "12345678"
+ };
+ args = mergeArgs(args, flowArgs);
+ }
+ return args;
+ }
+
+ @Override
+ protected void checkTimeline(ApplicationId appId, boolean defaultFlow,
+ boolean haveDomain, ApplicationReport appReport) throws Exception {
+ LOG.info("Started {}#checkTimeline()", getClass().getCanonicalName());
+ // For PoC check using the file-based timeline writer (YARN-3264)
+ String tmpRoot = getTimelineV2StorageDir() + File.separator + "entities"
+ + File.separator;
+
+ File tmpRootFolder = new File(tmpRoot);
+ try {
+ Assert.assertTrue(tmpRootFolder.isDirectory());
+ String basePath = tmpRoot +
+ YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
+ UserGroupInformation.getCurrentUser().getShortUserName() +
+ (defaultFlow ?
+ File.separator + appReport.getName() + File.separator +
+ TimelineUtils.DEFAULT_FLOW_VERSION + File.separator +
+ appReport.getStartTime() + File.separator :
+ File.separator + "test_flow_name" + File.separator +
+ "test_flow_version" + File.separator + "12345678" +
+ File.separator) +
+ appId.toString();
+ LOG.info("basePath for appId {}: {}", appId, basePath);
+ // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+
+ // Verify DS_APP_ATTEMPT entities posted by the client
+ // there will be at least one attempt, look for that file
+ String appTimestampFileName =
+ String.format("appattempt_%d_000%d_000001%s",
+ appId.getClusterTimestamp(), appId.getId(),
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+ File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
+ "DS_APP_ATTEMPT", appTimestampFileName);
+ // Check if required events are published and same idprefix is sent for
+ // on each publish.
+ verifyEntityForTimeline(dsAppAttemptEntityFile,
+ DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
+ // to avoid race condition of testcase, at least check 40 times with
+ // sleep of 50ms
+ verifyEntityForTimeline(dsAppAttemptEntityFile,
+ DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
+
+ // Verify DS_CONTAINER entities posted by the client.
+ String containerTimestampFileName =
+ String.format("container_%d_000%d_01_000002.thist",
+ appId.getClusterTimestamp(), appId.getId());
+ File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
+ "DS_CONTAINER", containerTimestampFileName);
+ // Check if required events are published and same idprefix is sent for
+ // on each publish.
+ verifyEntityForTimeline(dsContainerEntityFile,
+ DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
+ // to avoid race condition of testcase, at least check 40 times with
+ // sleep of 50ms.
+ verifyEntityForTimeline(dsContainerEntityFile,
+ DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
+
+ // Verify NM posting container metrics info.
+ String containerMetricsTimestampFileName =
+ String.format("container_%d_000%d_01_000001%s",
+ appId.getClusterTimestamp(), appId.getId(),
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+ File containerEntityFile = verifyEntityTypeFileExists(basePath,
+ TimelineEntityType.YARN_CONTAINER.toString(),
+ containerMetricsTimestampFileName);
+ verifyEntityForTimeline(containerEntityFile,
+ ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
+
+ // to avoid race condition of testcase, at least check 40 times with
+ // sleep of 50ms
+ verifyEntityForTimeline(containerEntityFile,
+ ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
+
+ // Verify RM posting Application life cycle Events are getting published
+ String appMetricsTimestampFileName =
+ String.format("application_%d_000%d%s",
+ appId.getClusterTimestamp(), appId.getId(),
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+ File appEntityFile =
+ verifyEntityTypeFileExists(basePath,
+ TimelineEntityType.YARN_APPLICATION.toString(),
+ appMetricsTimestampFileName);
+ // No need to check idprefix for app.
+ verifyEntityForTimeline(appEntityFile,
+ ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
+
+ // to avoid race condition of testcase, at least check 40 times with
+ // sleep of 50ms
+ verifyEntityForTimeline(appEntityFile,
+ ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
+
+ // Verify RM posting AppAttempt life cycle Events are getting published
+ String appAttemptMetricsTimestampFileName =
+ String.format("appattempt_%d_000%d_000001%s",
+ appId.getClusterTimestamp(), appId.getId(),
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION);
+
+ File appAttemptEntityFile =
+ verifyEntityTypeFileExists(basePath,
+ TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+ appAttemptMetricsTimestampFileName);
+ verifyEntityForTimeline(appAttemptEntityFile,
+ AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
+ verifyEntityForTimeline(appAttemptEntityFile,
+ AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
+ } finally {
+ try {
+ FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+ } catch (Exception ex) {
+ // the recursive delete can throw an exception when one of the file
+ // does not exist.
+ LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Checks the events and idprefix published for an entity.
+ *
+ * @param entityFile Entity file.
+ * @param expectedEvent Expected event Id.
+ * @param numOfExpectedEvent Number of expected occurrences of expected event
+ * id.
+ * @param checkTimes Number of times to check.
+ * @param sleepTime Sleep time for each iteration.
+ * @param checkIdPrefix Whether to check idprefix.
+ * @throws IOException if entity file reading fails.
+ * @throws InterruptedException if sleep is interrupted.
+ */
+ private void verifyEntityForTimeline(File entityFile, String expectedEvent,
+ long numOfExpectedEvent, int checkTimes, long sleepTime,
+ boolean checkIdPrefix) throws Exception {
+ AtomicReference<Throwable> thrownExceptionRef = new AtomicReference<>(null);
+ GenericTestUtils.waitFor(() -> {
+ String strLine;
+ long actualCount = 0;
+ long idPrefix = -1;
+ try (BufferedReader reader =
+ new BufferedReader(new FileReader(entityFile))) {
+ while ((strLine = reader.readLine()) != null) {
+ String entityLine = strLine.trim();
+ if (entityLine.isEmpty()) {
+ continue;
+ }
+ if (entityLine.contains(expectedEvent)) {
+ actualCount++;
+ }
+ if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString())
+ && entityLine.contains(expectedEvent)) {
+ TimelineEntity entity = FileSystemTimelineReaderImpl.
+ getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
+ TimelineEvent event = entity.getEvents().pollFirst();
+ Assert.assertNotNull(event);
+ Assert.assertTrue("diagnostics",
+ event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
+ }
+ if (checkIdPrefix) {
+ TimelineEntity entity = FileSystemTimelineReaderImpl.
+ getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
+ Assert.assertTrue("Entity ID prefix expected to be > 0",
+ entity.getIdPrefix() > 0);
+ if (idPrefix == -1) {
+ idPrefix = entity.getIdPrefix();
+ } else {
+ Assert.assertEquals(
+ "Entity ID prefix should be same across each publish of "
+ + "same entity", idPrefix, entity.getIdPrefix());
+ }
+ }
+ }
+ } catch (Throwable e) {
+ LOG.error("Exception is waiting on application report", e);
+ thrownExceptionRef.set(e);
+ return true;
+ }
+ return (numOfExpectedEvent == actualCount);
+ }, sleepTime, (checkTimes + 1) * sleepTime);
+
+ if (thrownExceptionRef.get() != null) {
+ Assert.fail("verifyEntityForTimeline failed "
+ + thrownExceptionRef.get().getMessage());
+ }
+ }
+
+ private File verifyEntityTypeFileExists(String basePath, String entityType,
+ String entityFileName) {
+ String outputDirPathForEntity =
+ basePath + File.separator + entityType + File.separator;
+ LOG.info("verifyEntityTypeFileExists output path for entityType {}: {}",
+ entityType, outputDirPathForEntity);
+ File outputDirForEntity = new File(outputDirPathForEntity);
+ Assert.assertTrue(outputDirForEntity.isDirectory());
+ String entityFilePath = outputDirPathForEntity + entityFileName;
+ File entityFile = new File(entityFilePath);
+ Assert.assertTrue(entityFile.exists());
+ return entityFile;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
index 39c774c..19f0423 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDSWithMultipleNodeManager.java
@@ -17,42 +17,50 @@
*/
package org.apache.hadoop.yarn.applications.distributedshell;
+
import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
-import java.util.Set;
import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
-
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
/**
* Test for Distributed Shell With Multiple Node Managers.
@@ -64,23 +72,28 @@ public class TestDSWithMultipleNodeManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestDSWithMultipleNodeManager.class);
- static final int NUM_NMS = 2;
- TestDistributedShell distShellTest;
- private final Boolean multiNodePlacementEnabled;
+ private static final int NUM_NMS = 2;
private static final String POLICY_CLASS_NAME =
- "org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement."
- + "ResourceUsageMultiNodeLookupPolicy";
+ ResourceUsageMultiNodeLookupPolicy.class.getName();
+ private final Boolean multiNodePlacementEnabled;
+ @Rule
+ public TestName name = new TestName();
+ @Rule
+ public Timeout globalTimeout =
+ new Timeout(DistributedShellBaseTest.TEST_TIME_OUT,
+ TimeUnit.MILLISECONDS);
+ private DistributedShellBaseTest distShellTest;
+ private Client dsClient;
+ public TestDSWithMultipleNodeManager(Boolean multiNodePlacementEnabled) {
+ this.multiNodePlacementEnabled = multiNodePlacementEnabled;
+ }
@Parameterized.Parameters
public static Collection<Boolean> getParams() {
return Arrays.asList(false, true);
}
- public TestDSWithMultipleNodeManager(Boolean multiNodePlacementEnabled) {
- this.multiNodePlacementEnabled = multiNodePlacementEnabled;
- }
-
private YarnConfiguration getConfiguration(
boolean multiNodePlacementConfigs) {
YarnConfiguration conf = new YarnConfiguration();
@@ -103,41 +116,59 @@ public class TestDSWithMultipleNodeManager {
return conf;
}
+ @BeforeClass
+ public static void setupUnitTests() throws Exception {
+ TestDSTimelineV10.setupUnitTests();
+ }
+
+ @AfterClass
+ public static void tearDownUnitTests() throws Exception {
+ TestDSTimelineV10.tearDownUnitTests();
+ }
+
@Before
public void setup() throws Exception {
- distShellTest = new TestDistributedShell();
+ distShellTest = new TestDSTimelineV10();
distShellTest.setupInternal(NUM_NMS,
getConfiguration(multiNodePlacementEnabled));
}
@After
public void tearDown() throws Exception {
- distShellTest.tearDown();
+ if (dsClient != null) {
+ dsClient.sendStopSignal();
+ dsClient = null;
+ }
+ if (distShellTest != null) {
+ distShellTest.tearDown();
+ distShellTest = null;
+ }
}
private void initializeNodeLabels() throws IOException {
- RMContext rmContext = distShellTest.yarnCluster.getResourceManager(0).getRMContext();
-
+ RMContext rmContext = distShellTest.getResourceManager(0).getRMContext();
// Setup node labels
RMNodeLabelsManager labelsMgr = rmContext.getNodeLabelManager();
- Set<String> labels = new HashSet<String>();
+ Set<String> labels = new HashSet<>();
labels.add("x");
labelsMgr.addToCluserNodeLabelsWithDefaultExclusivity(labels);
// Setup queue access to node labels
- distShellTest.conf.set(PREFIX + "root.accessible-node-labels", "x");
- distShellTest.conf.set(PREFIX + "root.accessible-node-labels.x.capacity",
- "100");
- distShellTest.conf.set(PREFIX + "root.default.accessible-node-labels", "x");
- distShellTest.conf.set(PREFIX
+ distShellTest.setConfiguration(PREFIX + "root.accessible-node-labels", "x");
+ distShellTest.setConfiguration(
+ PREFIX + "root.accessible-node-labels.x.capacity", "100");
+ distShellTest.setConfiguration(
+ PREFIX + "root.default.accessible-node-labels", "x");
+ distShellTest.setConfiguration(PREFIX
+ "root.default.accessible-node-labels.x.capacity", "100");
- rmContext.getScheduler().reinitialize(distShellTest.conf, rmContext);
+ rmContext.getScheduler().reinitialize(distShellTest.getConfiguration(),
+ rmContext);
// Fetch node-ids from yarn cluster
NodeId[] nodeIds = new NodeId[NUM_NMS];
for (int i = 0; i < NUM_NMS; i++) {
- NodeManager mgr = distShellTest.yarnCluster.getNodeManager(i);
+ NodeManager mgr = distShellTest.getNodeManager(i);
nodeIds[i] = mgr.getNMContext().getNodeId();
}
@@ -145,264 +176,312 @@ public class TestDSWithMultipleNodeManager {
labelsMgr.addLabelsToNode(ImmutableMap.of(nodeIds[1], labels));
}
- @Test(timeout=90000)
+ @Test
public void testDSShellWithNodeLabelExpression() throws Exception {
+ NMContainerMonitor containerMonitorRunner = null;
initializeNodeLabels();
- // Start NMContainerMonitor
- NMContainerMonitor mon = new NMContainerMonitor();
- Thread t = new Thread(mon);
- t.start();
-
- // Submit a job which will sleep for 60 sec
- String[] args = {
- "--jar",
- TestDistributedShell.APPMASTER_JAR,
- "--num_containers",
- "4",
- "--shell_command",
- "sleep",
- "--shell_args",
- "15",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1",
- "--node_label_expression",
- "x"
- };
-
- LOG.info("Initializing DS Client");
- final Client client =
- new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
-
- t.interrupt();
-
- // Check maximum number of containers on each NMs
- int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
- // Check no container allocated on NM[0]
- Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
- // Check there're some containers allocated on NM[1]
- Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+ try {
+ // Start NMContainerMonitor
+ containerMonitorRunner = new NMContainerMonitor();
+ containerMonitorRunner.start();
+
+ // Submit a job which will sleep for 60 sec
+ String[] args =
+ DistributedShellBaseTest.createArguments(() -> generateAppName(),
+ "--num_containers",
+ "4",
+ "--shell_command",
+ "sleep",
+ "--shell_args",
+ "15",
+ "--master_memory",
+ "512",
+ "--master_vcores",
+ "2",
+ "--container_memory",
+ "128",
+ "--container_vcores",
+ "1",
+ "--node_label_expression",
+ "x"
+ );
+
+ LOG.info("Initializing DS Client");
+ dsClient =
+ new Client(
+ new Configuration(distShellTest.getYarnClusterConfiguration()));
+ Assert.assertTrue(dsClient.init(args));
+ LOG.info("Running DS Client");
+ boolean result = dsClient.run();
+ LOG.info("Client run completed. Result={}", result);
+
+ containerMonitorRunner.stopMonitoring();
+
+ // Check maximum number of containers on each NMs
+ int[] maxRunningContainersOnNMs =
+ containerMonitorRunner.getMaxRunningContainersReport();
+ // Check no container allocated on NM[0]
+ Assert.assertEquals(0, maxRunningContainersOnNMs[0]);
+ // Check there are some containers allocated on NM[1]
+ Assert.assertTrue(maxRunningContainersOnNMs[1] > 0);
+ } finally {
+ if (containerMonitorRunner != null) {
+ containerMonitorRunner.stopMonitoring();
+ containerMonitorRunner.join();
+ }
+ }
}
- @Test(timeout = 90000)
+ @Test
public void testDistributedShellWithPlacementConstraint()
throws Exception {
- NMContainerMonitor mon = new NMContainerMonitor();
- Thread t = new Thread(mon);
- t.start();
-
- String[] args = {
- "--jar",
- distShellTest.APPMASTER_JAR,
- "1",
- "--shell_command",
- distShellTest.getSleepCommand(15),
- "--placement_spec",
- "zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
- };
- LOG.info("Initializing DS Client");
- final Client client =
- new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
-
- t.interrupt();
-
- ConcurrentMap<ApplicationId, RMApp> apps = distShellTest.yarnCluster.
- getResourceManager().getRMContext().getRMApps();
- RMApp app = apps.values().iterator().next();
- RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
- NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
- NodeManager nm1 = distShellTest.yarnCluster.getNodeManager(0);
-
- int expectedNM1Count = 1;
- int expectedNM2Count = 1;
- if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
- expectedNM1Count++;
- } else {
- expectedNM2Count++;
- }
+ NMContainerMonitor containerMonitorRunner = null;
+ String[] args =
+ DistributedShellBaseTest.createArguments(() -> generateAppName(),
+ "1",
+ "--shell_command",
+ DistributedShellBaseTest.getSleepCommand(15),
+ "--placement_spec",
+ "zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
+ );
+ try {
+ containerMonitorRunner = new NMContainerMonitor();
+ containerMonitorRunner.start();
+
+ LOG.info("Initializing DS Client with args {}", Arrays.toString(args));
+ dsClient =
+ new Client(
+ new Configuration(distShellTest.getYarnClusterConfiguration()));
+ Assert.assertTrue(dsClient.init(args));
+ LOG.info("Running DS Client");
+ boolean result = dsClient.run();
+ LOG.info("Client run completed. Result={}", result);
+
+ containerMonitorRunner.stopMonitoring();
+
+ ConcurrentMap<ApplicationId, RMApp> apps =
+ distShellTest.getResourceManager().getRMContext().getRMApps();
+ RMApp app = apps.values().iterator().next();
+ RMAppAttempt appAttempt = app.getAppAttempts().values().iterator().next();
+ NodeId masterNodeId = appAttempt.getMasterContainer().getNodeId();
+ NodeManager nm1 = distShellTest.getNodeManager(0);
+
+ int[] expectedNMsCount = new int[]{1, 1};
+ if (nm1.getNMContext().getNodeId().equals(masterNodeId)) {
+ expectedNMsCount[0]++;
+ } else {
+ expectedNMsCount[1]++;
+ }
- int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
- Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
- Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
+ int[] maxRunningContainersOnNMs =
+ containerMonitorRunner.getMaxRunningContainersReport();
+ Assert.assertEquals(expectedNMsCount[0], maxRunningContainersOnNMs[0]);
+ Assert.assertEquals(expectedNMsCount[1], maxRunningContainersOnNMs[1]);
+ } finally {
+ if (containerMonitorRunner != null) {
+ containerMonitorRunner.stopMonitoring();
+ containerMonitorRunner.join();
+ }
+ }
}
- @Test(timeout = 90000)
+ @Test
public void testDistributedShellWithAllocationTagNamespace()
throws Exception {
- NMContainerMonitor mon = new NMContainerMonitor();
- Thread monitorThread = new Thread(mon);
- monitorThread.start();
-
- String[] argsA = {
- "--jar",
- distShellTest.APPMASTER_JAR,
- "--shell_command",
- distShellTest.getSleepCommand(30),
- "--placement_spec",
- "bar(1),notin,node,bar"
- };
- final Client clientA =
- new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
- clientA.init(argsA);
- final AtomicBoolean resultA = new AtomicBoolean(false);
- Thread t = new Thread() {
- public void run() {
+ NMContainerMonitor containerMonitorRunner = null;
+ Client clientB = null;
+ YarnClient yarnClient = null;
+
+ String[] argsA =
+ DistributedShellBaseTest.createArguments(() -> generateAppName("001"),
+ "--shell_command",
+ DistributedShellBaseTest.getSleepCommand(30),
+ "--placement_spec",
+ "bar(1),notin,node,bar"
+ );
+ String[] argsB =
+ DistributedShellBaseTest.createArguments(() -> generateAppName("002"),
+ "1",
+ "--shell_command",
+ DistributedShellBaseTest.getListCommand(),
+ "--placement_spec",
+ "foo(3),notin,node,all/bar"
+ );
+
+ try {
+ containerMonitorRunner = new NMContainerMonitor();
+ containerMonitorRunner.start();
+ dsClient =
+ new Client(
+ new Configuration(distShellTest.getYarnClusterConfiguration()));
+ dsClient.init(argsA);
+ Thread dsClientRunner = new Thread(() -> {
try {
- resultA.set(clientA.run());
+ dsClient.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
+ });
+ dsClientRunner.start();
+
+ NodeId taskContainerNodeIdA;
+ ConcurrentMap<ApplicationId, RMApp> apps;
+ AtomicReference<RMApp> appARef = new AtomicReference<>(null);
+ AtomicReference<NodeId> masterContainerNodeIdARef =
+ new AtomicReference<>(null);
+ int[] expectedNMCounts = new int[]{0, 0};
+
+ waitForExpectedNMsCount(expectedNMCounts, appARef,
+ masterContainerNodeIdARef);
+
+ NodeId nodeA = distShellTest.getNodeManager(0).getNMContext().
+ getNodeId();
+ NodeId nodeB = distShellTest.getNodeManager(1).getNMContext().
+ getNodeId();
+ Assert.assertEquals(2, (expectedNMCounts[0] + expectedNMCounts[1]));
+ if (expectedNMCounts[0] != expectedNMCounts[1]) {
+ taskContainerNodeIdA = masterContainerNodeIdARef.get();
+ } else {
+ taskContainerNodeIdA =
+ masterContainerNodeIdARef.get().equals(nodeA) ? nodeB : nodeA;
}
- };
- t.start();
-
- NodeId masterContainerNodeIdA;
- NodeId taskContainerNodeIdA;
- ConcurrentMap<ApplicationId, RMApp> apps;
- RMApp appA;
-
- int expectedNM1Count = 0;
- int expectedNM2Count = 0;
- while (true) {
- if ((expectedNM1Count + expectedNM2Count) < 2) {
- expectedNM1Count = distShellTest.yarnCluster.getNodeManager(0).
- getNMContext().getContainers().size();
- expectedNM2Count = distShellTest.yarnCluster.getNodeManager(1).
- getNMContext().getContainers().size();
- continue;
+
+ clientB =
+ new Client(
+ new Configuration(distShellTest.getYarnClusterConfiguration()));
+ clientB.init(argsB);
+ Assert.assertTrue(clientB.run());
+ containerMonitorRunner.stopMonitoring();
+ apps = distShellTest.getResourceManager().getRMContext().getRMApps();
+ Iterator<RMApp> it = apps.values().iterator();
+ RMApp appB = it.next();
+ if (appARef.get().equals(appB)) {
+ appB = it.next();
}
- apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
- getRMApps();
- if (apps.isEmpty()) {
- Thread.sleep(10);
- continue;
+ LOG.info("Allocation Tag NameSpace Applications are={} and {}",
+ appARef.get().getApplicationId(), appB.getApplicationId());
+
+ RMAppAttempt appAttemptB =
+ appB.getAppAttempts().values().iterator().next();
+ NodeId masterContainerNodeIdB =
+ appAttemptB.getMasterContainer().getNodeId();
+
+ if (nodeA.equals(masterContainerNodeIdB)) {
+ expectedNMCounts[0]++;
+ } else {
+ expectedNMCounts[1]++;
}
- appA = apps.values().iterator().next();
- if (appA.getAppAttempts().isEmpty()) {
- Thread.sleep(10);
- continue;
+ if (nodeA.equals(taskContainerNodeIdA)) {
+ expectedNMCounts[1] += 3;
+ } else {
+ expectedNMCounts[0] += 3;
}
- RMAppAttempt appAttemptA = appA.getAppAttempts().values().iterator().
- next();
- if (appAttemptA.getMasterContainer() == null) {
- Thread.sleep(10);
- continue;
+ int[] maxRunningContainersOnNMs =
+ containerMonitorRunner.getMaxRunningContainersReport();
+ Assert.assertEquals(expectedNMCounts[0], maxRunningContainersOnNMs[0]);
+ Assert.assertEquals(expectedNMCounts[1], maxRunningContainersOnNMs[1]);
+
+ try {
+ yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(
+ new Configuration(distShellTest.getYarnClusterConfiguration()));
+ yarnClient.start();
+ yarnClient.killApplication(appARef.get().getApplicationId());
+ } catch (Exception e) {
+ // Ignore Exception while killing a job
+ LOG.warn("Exception killing the job: {}", e.getMessage());
+ }
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
+ if (clientB != null) {
+ clientB.sendStopSignal();
+ }
+ if (containerMonitorRunner != null) {
+ containerMonitorRunner.stopMonitoring();
+ containerMonitorRunner.join();
}
- masterContainerNodeIdA = appAttemptA.getMasterContainer().getNodeId();
- break;
}
+ }
- NodeId nodeA = distShellTest.yarnCluster.getNodeManager(0).getNMContext().
- getNodeId();
- NodeId nodeB = distShellTest.yarnCluster.getNodeManager(1).getNMContext().
- getNodeId();
- Assert.assertEquals(2, (expectedNM1Count + expectedNM2Count));
-
- if (expectedNM1Count != expectedNM2Count) {
- taskContainerNodeIdA = masterContainerNodeIdA;
- } else {
- taskContainerNodeIdA = masterContainerNodeIdA.equals(nodeA) ? nodeB :
- nodeA;
- }
+ protected String generateAppName() {
+ return generateAppName(null);
+ }
- String[] argsB = {
- "--jar",
- distShellTest.APPMASTER_JAR,
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--placement_spec",
- "foo(3),notin,node,all/bar"
- };
- final Client clientB = new Client(new Configuration(distShellTest.
- yarnCluster.getConfig()));
- clientB.init(argsB);
- boolean resultB = clientB.run();
- Assert.assertTrue(resultB);
-
- monitorThread.interrupt();
- apps = distShellTest.yarnCluster.getResourceManager().getRMContext().
- getRMApps();
- Iterator<RMApp> it = apps.values().iterator();
- RMApp appB = it.next();
- if (appA.equals(appB)) {
- appB = it.next();
- }
- LOG.info("Allocation Tag NameSpace Applications are=" + appA.
- getApplicationId() + " and " + appB.getApplicationId());
-
- RMAppAttempt appAttemptB = appB.getAppAttempts().values().iterator().
- next();
- NodeId masterContainerNodeIdB = appAttemptB.getMasterContainer().
- getNodeId();
-
- if (nodeA.equals(masterContainerNodeIdB)) {
- expectedNM1Count += 1;
- } else {
- expectedNM2Count += 1;
- }
- if (nodeA.equals(taskContainerNodeIdA)) {
- expectedNM2Count += 3;
- } else {
- expectedNM1Count += 3;
- }
- int[] maxRunningContainersOnNMs = mon.getMaxRunningContainersReport();
- Assert.assertEquals(expectedNM1Count, maxRunningContainersOnNMs[0]);
- Assert.assertEquals(expectedNM2Count, maxRunningContainersOnNMs[1]);
+ protected String generateAppName(String postFix) {
+ return name.getMethodName().replaceFirst("test", "")
+ .concat(postFix == null ? "" : "-" + postFix);
+ }
- try {
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(new Configuration(distShellTest.yarnCluster.
- getConfig()));
- yarnClient.start();
- yarnClient.killApplication(appA.getApplicationId());
- } catch (Exception e) {
- // Ignore Exception while killing a job
- }
+ private void waitForExpectedNMsCount(int[] expectedNMCounts,
+ AtomicReference<RMApp> appARef,
+ AtomicReference<NodeId> masterContainerNodeIdARef) throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ if ((expectedNMCounts[0] + expectedNMCounts[1]) < 2) {
+ expectedNMCounts[0] =
+ distShellTest.getNodeManager(0).getNMContext()
+ .getContainers().size();
+ expectedNMCounts[1] =
+ distShellTest.getNodeManager(1).getNMContext()
+ .getContainers().size();
+ return false;
+ }
+ ConcurrentMap<ApplicationId, RMApp> appIDsMap =
+ distShellTest.getResourceManager().getRMContext().getRMApps();
+ if (appIDsMap.isEmpty()) {
+ return false;
+ }
+ appARef.set(appIDsMap.values().iterator().next());
+ if (appARef.get().getAppAttempts().isEmpty()) {
+ return false;
+ }
+ RMAppAttempt appAttemptA =
+ appARef.get().getAppAttempts().values().iterator().next();
+ if (appAttemptA.getMasterContainer() == null) {
+ return false;
+ }
+ masterContainerNodeIdARef.set(
+ appAttemptA.getMasterContainer().getNodeId());
+ return true;
+ }, 10, 60000);
}
/**
- * Monitor containers running on NMs
+ * Monitor containers running on NMs.
*/
- class NMContainerMonitor implements Runnable {
+ class NMContainerMonitor extends Thread {
// The interval of milliseconds of sampling (500ms)
- final static int SAMPLING_INTERVAL_MS = 500;
+ private final static int SAMPLING_INTERVAL_MS = 500;
// The maximum number of containers running on each NMs
- int[] maxRunningContainersOnNMs = new int[NUM_NMS];
+ private final int[] maxRunningContainersOnNMs = new int[NUM_NMS];
+ private final Object quitSignal = new Object();
+ private volatile boolean isRunning = true;
@Override
public void run() {
- while (true) {
+ while (isRunning) {
for (int i = 0; i < NUM_NMS; i++) {
int nContainers =
- distShellTest.yarnCluster.getNodeManager(i).getNMContext()
+ distShellTest.getNodeManager(i).getNMContext()
.getContainers().size();
if (nContainers > maxRunningContainersOnNMs[i]) {
maxRunningContainersOnNMs[i] = nContainers;
}
}
- try {
- Thread.sleep(SAMPLING_INTERVAL_MS);
- } catch (InterruptedException e) {
- e.printStackTrace();
- break;
+ synchronized (quitSignal) {
+ try {
+ if (!isRunning) {
+ break;
+ }
+ quitSignal.wait(SAMPLING_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ LOG.warn("NMContainerMonitor interrupted");
+ isRunning = false;
+ break;
+ }
}
}
}
@@ -410,5 +489,15 @@ public class TestDSWithMultipleNodeManager {
public int[] getMaxRunningContainersReport() {
return maxRunningContainersOnNMs;
}
+
+ public void stopMonitoring() {
+ if (!isRunning) {
+ return;
+ }
+ synchronized (quitSignal) {
+ isRunning = false;
+ quitSignal.notifyAll();
+ }
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
deleted file mode 100644
index 87479d6..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ /dev/null
@@ -1,1865 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.applications.distributedshell;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import java.io.BufferedReader;
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.io.UncheckedIOException;
-import java.net.URI;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import java.util.function.Supplier;
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.net.ServerSocketUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.JarFinder;
-import org.apache.hadoop.util.Shell;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationReport;
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerReport;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.LogAggregationContext;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
-import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster.DSEvent;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.client.api.impl.DirectTimelineWriter;
-import org.apache.hadoop.yarn.client.api.impl.TestTimelineClient;
-import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
-import org.apache.hadoop.yarn.client.api.impl.TimelineWriter;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
-import org.apache.hadoop.yarn.server.MiniYARNCluster;
-import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
-import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.timeline.NameValuePair;
-import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
-import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
-import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
-import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
-import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
-import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.yarn.util.Records;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-import org.junit.rules.Timeout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestDistributedShell {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TestDistributedShell.class);
-
- protected MiniYARNCluster yarnCluster = null;
- protected MiniDFSCluster hdfsCluster = null;
- private FileSystem fs = null;
- private TimelineWriter spyTimelineWriter;
- protected YarnConfiguration conf = null;
- // location of the filesystem timeline writer for timeline service v.2
- private String timelineV2StorageDir = null;
- private static final int NUM_NMS = 1;
- private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
- private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
- private static final int MIN_ALLOCATION_MB = 128;
- private static final int TEST_TIME_OUT = 150000;
- // set the timeout of the yarnClient to be 95% of the globalTimeout.
- private static final int TEST_TIME_WINDOW_EXPIRE = (TEST_TIME_OUT * 90) / 100;
-
- protected final static String APPMASTER_JAR =
- JarFinder.getJar(ApplicationMaster.class);
-
- @Rule
- public TimelineVersionWatcher timelineVersionWatcher
- = new TimelineVersionWatcher();
-
- @Rule
- public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
- TimeUnit.MILLISECONDS);
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Rule
- public TestName name = new TestName();
-
- // set the timeout of the yarnClient to be 95% of the globalTimeout.
- private final String yarnClientTimeout =
- String.valueOf(TEST_TIME_WINDOW_EXPIRE);
-
- private final String[] commonArgs = {
- "--jar",
- APPMASTER_JAR,
- "--timeout",
- yarnClientTimeout,
- "--appname",
- ""
- };
-
- @Before
- public void setup() throws Exception {
- setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion(),
- new YarnConfiguration());
- }
-
- protected void setupInternal(int numNodeManager,
- YarnConfiguration yarnConfig) throws Exception {
- setupInternal(numNodeManager, DEFAULT_TIMELINE_VERSION, yarnConfig);
- }
-
- private void setupInternal(int numNodeManager, float timelineVersion,
- YarnConfiguration yarnConfig)
- throws Exception {
- LOG.info("Starting up YARN cluster");
-
- this.conf = yarnConfig;
- conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- MIN_ALLOCATION_MB);
- // reduce the teardown waiting time
- conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
- conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
- conf.set("yarn.log.dir", "target");
- conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
- // mark if we need to launch the v1 timeline server
- // disable aux-service based timeline aggregators
- conf.set(YarnConfiguration.NM_AUX_SERVICES, "");
- conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
-
- conf.set(YarnConfiguration.NM_VMEM_PMEM_RATIO, "8");
- conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
- conf.set("mapreduce.jobhistory.address",
- "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
- // Enable ContainersMonitorImpl
- conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
- LinuxResourceCalculatorPlugin.class.getName());
- conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
- ProcfsBasedProcessTree.class.getName());
- conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
- conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
- conf.setBoolean(
- YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
- true);
- conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
- true);
- conf.setBoolean(
- YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
- conf.setInt(YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH,
- 10);
- conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
- YarnConfiguration.PROCESSOR_RM_PLACEMENT_CONSTRAINTS_HANDLER);
- // ATS version specific settings
- if (timelineVersion == 1.0f) {
- conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
- conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
- CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
- } else if (timelineVersion == 1.5f) {
- HdfsConfiguration hdfsConfig = new HdfsConfiguration();
- hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
- .numDataNodes(1).build();
- hdfsCluster.waitActive();
- fs = hdfsCluster.getFileSystem();
- PluginStoreTestUtils.prepareFileSystemForPluginStore(fs);
- PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
- conf.set(YarnConfiguration.TIMELINE_SERVICE_ENTITY_GROUP_PLUGIN_CLASSES,
- DistributedShellTimelinePlugin.class.getName());
- } else if (timelineVersion == 2.0f) {
- // set version to 2
- conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
- // disable v1 timeline server since we no longer have a server here
- // enable aux-service based timeline aggregators
- conf.set(YarnConfiguration.NM_AUX_SERVICES, TIMELINE_AUX_SERVICE_NAME);
- conf.set(YarnConfiguration.NM_AUX_SERVICES + "." +
- TIMELINE_AUX_SERVICE_NAME + ".class",
- PerNodeTimelineCollectorsAuxService.class.getName());
- conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
- FileSystemTimelineWriterImpl.class,
- org.apache.hadoop.yarn.server.timelineservice.storage.
- TimelineWriter.class);
- timelineV2StorageDir = tmpFolder.newFolder().getAbsolutePath();
- // set the file system timeline writer storage directory
- conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
- timelineV2StorageDir);
- } else {
- Assert.fail("Wrong timeline version number: " + timelineVersion);
- }
-
- yarnCluster =
- new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
- numNodeManager, 1, 1);
- yarnCluster.init(conf);
- yarnCluster.start();
-
- conf.set(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- MiniYARNCluster.getHostname() + ":"
- + yarnCluster.getApplicationHistoryServer().getPort());
-
- waitForNMsToRegister();
-
- URL url = Thread.currentThread().getContextClassLoader().getResource(
- "yarn-site.xml");
- if (url == null) {
- throw new RuntimeException(
- "Could not find 'yarn-site.xml' dummy file in classpath");
- }
- Configuration yarnClusterConfig = yarnCluster.getConfig();
- yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- new File(url.getPath()).getParent());
- //write the document to a buffer (not directly to the file, as that
- //can cause the file being written to get read -which will then fail.
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- yarnClusterConfig.writeXml(bytesOut);
- bytesOut.close();
- //write the bytes to the file in the classpath
- OutputStream os = new FileOutputStream(url.getPath());
- os.write(bytesOut.toByteArray());
- os.close();
-
- FileContext fsContext = FileContext.getLocalFSFileContext();
- fsContext
- .delete(
- new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
- true);
- try {
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- LOG.info("setup thread sleep interrupted. message=" + e.getMessage());
- }
- }
-
- @After
- public void tearDown() throws IOException {
- FileContext fsContext = FileContext.getLocalFSFileContext();
- fsContext
- .delete(
- new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
- true);
- if (yarnCluster != null) {
- try {
- yarnCluster.stop();
- } finally {
- yarnCluster = null;
- }
- }
- if (hdfsCluster != null) {
- try {
- hdfsCluster.shutdown();
- } finally {
- hdfsCluster = null;
- }
- }
- }
-
- @Test
- public void testDSShellWithDomain() throws Exception {
- testDSShell(true);
- }
-
- @Test
- public void testDSShellWithoutDomain() throws Exception {
- testDSShell(false);
- }
-
- @Test
- @TimelineVersion(1.5f)
- public void testDSShellWithoutDomainV1_5() throws Exception {
- testDSShell(false);
- }
-
- @Test
- @TimelineVersion(1.5f)
- public void testDSShellWithDomainV1_5() throws Exception {
- testDSShell(true);
- }
-
- @Test
- @TimelineVersion(2.0f)
- public void testDSShellWithoutDomainV2() throws Exception {
- testDSShell(false);
- }
-
- public void testDSShell(boolean haveDomain) throws Exception {
- testDSShell(haveDomain, true);
- }
-
- @Test
- @TimelineVersion(2.0f)
- public void testDSShellWithoutDomainV2DefaultFlow() throws Exception {
- testDSShell(false, true);
- }
-
- @Test
- @TimelineVersion(2.0f)
- public void testDSShellWithoutDomainV2CustomizedFlow() throws Exception {
- testDSShell(false, false);
- }
-
- public void testDSShell(boolean haveDomain, boolean defaultFlow)
- throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1");
-
- if (haveDomain) {
- String[] domainArgs = {
- "--domain",
- "TEST_DOMAIN",
- "--view_acls",
- "reader_user reader_group",
- "--modify_acls",
- "writer_user writer_group",
- "--create"
- };
- args = mergeArgs(args, domainArgs);
- }
- boolean isTestingTimelineV2 = false;
- if (timelineVersionWatcher.getTimelineVersion() == 2.0f) {
- isTestingTimelineV2 = true;
- if (!defaultFlow) {
- String[] flowArgs = {
- "--flow_name",
- "test_flow_name",
- "--flow_version",
- "test_flow_version",
- "--flow_run_id",
- "12345678"
- };
- args = mergeArgs(args, flowArgs);
- }
- LOG.info("Setup: Using timeline v2!");
- }
-
- LOG.info("Initializing DS Client");
- YarnClient yarnClient;
- final Client client = new Client(new Configuration(yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
- final AtomicBoolean result = new AtomicBoolean(false);
- Thread t = new Thread() {
- public void run() {
- try {
- result.set(client.run());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- t.start();
-
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(new Configuration(yarnCluster.getConfig()));
- yarnClient.start();
-
- boolean verified = false;
- String errorMessage = "";
- ApplicationId appId = null;
- ApplicationReport appReport = null;
- while (!verified) {
- List<ApplicationReport> apps = yarnClient.getApplications();
- if (apps.size() == 0) {
- Thread.sleep(10);
- continue;
- }
- appReport = apps.get(0);
- appId = appReport.getApplicationId();
- if (appReport.getHost().equals("N/A")) {
- Thread.sleep(10);
- continue;
- }
- errorMessage =
- "'. Expected rpc port to be '-1', was '"
- + appReport.getRpcPort() + "'.";
- if (appReport.getRpcPort() == -1) {
- verified = true;
- }
-
- if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
- && appReport.getFinalApplicationStatus() !=
- FinalApplicationStatus.UNDEFINED) {
- break;
- }
- }
- Assert.assertTrue(errorMessage, verified);
- t.join();
- LOG.info("Client run completed for testDSShell. Result=" + result);
- Assert.assertTrue(result.get());
-
- if (timelineVersionWatcher.getTimelineVersion() == 1.5f) {
- long scanInterval = conf.getLong(
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS,
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SCAN_INTERVAL_SECONDS_DEFAULT
- );
- Path doneDir = new Path(
- YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT
- );
- // Wait till the data is moved to done dir, or timeout and fail
- while (true) {
- RemoteIterator<FileStatus> iterApps = fs.listStatusIterator(doneDir);
- if (iterApps.hasNext()) {
- break;
- }
- Thread.sleep(scanInterval * 2);
- }
- }
-
- if (!isTestingTimelineV2) {
- checkTimelineV1(haveDomain);
- } else {
- checkTimelineV2(appId, defaultFlow, appReport);
- }
- }
-
- private void checkTimelineV1(boolean haveDomain) throws Exception {
- TimelineDomain domain = null;
- if (haveDomain) {
- domain = yarnCluster.getApplicationHistoryServer()
- .getTimelineStore().getDomain("TEST_DOMAIN");
- Assert.assertNotNull(domain);
- Assert.assertEquals("reader_user reader_group", domain.getReaders());
- Assert.assertEquals("writer_user writer_group", domain.getWriters());
- }
- TimelineEntities entitiesAttempts = yarnCluster
- .getApplicationHistoryServer()
- .getTimelineStore()
- .getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
- null, null, null, null, null, null, null, null, null);
- Assert.assertNotNull(entitiesAttempts);
- Assert.assertEquals(1, entitiesAttempts.getEntities().size());
- Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
- .size());
- Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
- ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
- if (haveDomain) {
- Assert.assertEquals(domain.getId(),
- entitiesAttempts.getEntities().get(0).getDomainId());
- } else {
- Assert.assertEquals("DEFAULT",
- entitiesAttempts.getEntities().get(0).getDomainId());
- }
- String currAttemptEntityId
- = entitiesAttempts.getEntities().get(0).getEntityId();
- ApplicationAttemptId attemptId = ApplicationAttemptId.fromString(
- currAttemptEntityId);
- NameValuePair primaryFilter = new NameValuePair(
- ApplicationMaster.APPID_TIMELINE_FILTER_NAME,
- attemptId.getApplicationId().toString());
- TimelineEntities entities = yarnCluster
- .getApplicationHistoryServer()
- .getTimelineStore()
- .getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
- null, null, null, null, primaryFilter, null, null, null);
- Assert.assertNotNull(entities);
- Assert.assertEquals(2, entities.getEntities().size());
- Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
- ApplicationMaster.DSEntity.DS_CONTAINER.toString());
-
- String entityId = entities.getEntities().get(0).getEntityId();
- org.apache.hadoop.yarn.api.records.timeline.TimelineEntity entity =
- yarnCluster.getApplicationHistoryServer().getTimelineStore()
- .getEntity(entityId,
- ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null);
- Assert.assertNotNull(entity);
- Assert.assertEquals(entityId, entity.getEntityId());
-
- if (haveDomain) {
- Assert.assertEquals(domain.getId(),
- entities.getEntities().get(0).getDomainId());
- } else {
- Assert.assertEquals("DEFAULT",
- entities.getEntities().get(0).getDomainId());
- }
- }
-
- private void checkTimelineV2(ApplicationId appId,
- boolean defaultFlow, ApplicationReport appReport) throws Exception {
- LOG.info("Started checkTimelineV2 ");
- // For PoC check using the file-based timeline writer (YARN-3264)
- String tmpRoot = timelineV2StorageDir + File.separator + "entities" +
- File.separator;
-
- File tmpRootFolder = new File(tmpRoot);
- try {
- Assert.assertTrue(tmpRootFolder.isDirectory());
- String basePath = tmpRoot +
- YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
- UserGroupInformation.getCurrentUser().getShortUserName() +
- (defaultFlow ?
- File.separator + appReport.getName() + File.separator +
- TimelineUtils.DEFAULT_FLOW_VERSION + File.separator +
- appReport.getStartTime() + File.separator :
- File.separator + "test_flow_name" + File.separator +
- "test_flow_version" + File.separator + "12345678" +
- File.separator) +
- appId.toString();
- LOG.info("basePath: " + basePath);
- // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
-
- // Verify DS_APP_ATTEMPT entities posted by the client
- // there will be at least one attempt, look for that file
- String appTimestampFileName =
- "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- File dsAppAttemptEntityFile = verifyEntityTypeFileExists(basePath,
- "DS_APP_ATTEMPT", appTimestampFileName);
- // Check if required events are published and same idprefix is sent for
- // on each publish.
- verifyEntityForTimelineV2(dsAppAttemptEntityFile,
- DSEvent.DS_APP_ATTEMPT_START.toString(), 1, 1, 0, true);
- // to avoid race condition of testcase, atleast check 40 times with sleep
- // of 50ms
- verifyEntityForTimelineV2(dsAppAttemptEntityFile,
- DSEvent.DS_APP_ATTEMPT_END.toString(), 1, 40, 50, true);
-
- // Verify DS_CONTAINER entities posted by the client.
- String containerTimestampFileName =
- "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_01_000002.thist";
- File dsContainerEntityFile = verifyEntityTypeFileExists(basePath,
- "DS_CONTAINER", containerTimestampFileName);
- // Check if required events are published and same idprefix is sent for
- // on each publish.
- verifyEntityForTimelineV2(dsContainerEntityFile,
- DSEvent.DS_CONTAINER_START.toString(), 1, 1, 0, true);
- // to avoid race condition of testcase, atleast check 40 times with sleep
- // of 50ms
- verifyEntityForTimelineV2(dsContainerEntityFile,
- DSEvent.DS_CONTAINER_END.toString(), 1, 40, 50, true);
-
- // Verify NM posting container metrics info.
- String containerMetricsTimestampFileName =
- "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_01_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- File containerEntityFile = verifyEntityTypeFileExists(basePath,
- TimelineEntityType.YARN_CONTAINER.toString(),
- containerMetricsTimestampFileName);
- verifyEntityForTimelineV2(containerEntityFile,
- ContainerMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, true);
-
- // to avoid race condition of testcase, atleast check 40 times with sleep
- // of 50ms
- verifyEntityForTimelineV2(containerEntityFile,
- ContainerMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, true);
-
- // Verify RM posting Application life cycle Events are getting published
- String appMetricsTimestampFileName =
- "application_" + appId.getClusterTimestamp() + "_000" + appId.getId()
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- File appEntityFile =
- verifyEntityTypeFileExists(basePath,
- TimelineEntityType.YARN_APPLICATION.toString(),
- appMetricsTimestampFileName);
- // No need to check idprefix for app.
- verifyEntityForTimelineV2(appEntityFile,
- ApplicationMetricsConstants.CREATED_EVENT_TYPE, 1, 1, 0, false);
-
- // to avoid race condition of testcase, atleast check 40 times with sleep
- // of 50ms
- verifyEntityForTimelineV2(appEntityFile,
- ApplicationMetricsConstants.FINISHED_EVENT_TYPE, 1, 40, 50, false);
-
- // Verify RM posting AppAttempt life cycle Events are getting published
- String appAttemptMetricsTimestampFileName =
- "appattempt_" + appId.getClusterTimestamp() + "_000" + appId.getId()
- + "_000001"
- + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
- File appAttemptEntityFile =
- verifyEntityTypeFileExists(basePath,
- TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
- appAttemptMetricsTimestampFileName);
- verifyEntityForTimelineV2(appAttemptEntityFile,
- AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE, 1, 1, 0, true);
- verifyEntityForTimelineV2(appAttemptEntityFile,
- AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
- } finally {
- try {
- FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
- } catch (FileNotFoundException ex) {
- // the recursive delete can throw an exception when one of the file
- // does not exist.
- LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
- }
- }
- }
-
- private File verifyEntityTypeFileExists(String basePath, String entityType,
- String entityfileName) {
- String outputDirPathForEntity =
- basePath + File.separator + entityType + File.separator;
- LOG.info(outputDirPathForEntity);
- File outputDirForEntity = new File(outputDirPathForEntity);
- Assert.assertTrue(outputDirForEntity.isDirectory());
-
- String entityFilePath = outputDirPathForEntity + entityfileName;
-
- File entityFile = new File(entityFilePath);
- Assert.assertTrue(entityFile.exists());
- return entityFile;
- }
-
- /**
- * Checks the events and idprefix published for an entity.
- *
- * @param entityFile Entity file.
- * @param expectedEvent Expected event Id.
- * @param numOfExpectedEvent Number of expected occurences of expected event
- * id.
- * @param checkTimes Number of times to check.
- * @param sleepTime Sleep time for each iteration.
- * @param checkIdPrefix Whether to check idprefix.
- * @throws IOException if entity file reading fails.
- * @throws InterruptedException if sleep is interrupted.
- */
- private void verifyEntityForTimelineV2(File entityFile, String expectedEvent,
- long numOfExpectedEvent, int checkTimes, long sleepTime,
- boolean checkIdPrefix) throws IOException, InterruptedException {
- long actualCount = 0;
- for (int i = 0; i < checkTimes; i++) {
- BufferedReader reader = null;
- String strLine;
- actualCount = 0;
- try {
- reader = new BufferedReader(new FileReader(entityFile));
- long idPrefix = -1;
- while ((strLine = reader.readLine()) != null) {
- String entityLine = strLine.trim();
- if (entityLine.isEmpty()) {
- continue;
- }
- if (entityLine.contains(expectedEvent)) {
- actualCount++;
- }
- if (expectedEvent.equals(DSEvent.DS_CONTAINER_END.toString()) &&
- entityLine.contains(expectedEvent)) {
- TimelineEntity entity = FileSystemTimelineReaderImpl.
- getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
- TimelineEvent event = entity.getEvents().pollFirst();
- Assert.assertNotNull(event);
- Assert.assertTrue("diagnostics",
- event.getInfo().containsKey(ApplicationMaster.DIAGNOSTICS));
- }
- if (checkIdPrefix) {
- TimelineEntity entity = FileSystemTimelineReaderImpl.
- getTimelineRecordFromJSON(entityLine, TimelineEntity.class);
- Assert.assertTrue("Entity ID prefix expected to be > 0",
- entity.getIdPrefix() > 0);
- if (idPrefix == -1) {
- idPrefix = entity.getIdPrefix();
- } else {
- Assert.assertEquals("Entity ID prefix should be same across " +
- "each publish of same entity",
- idPrefix, entity.getIdPrefix());
- }
- }
- }
- } finally {
- if (reader != null) {
- reader.close();
- }
- }
- if (numOfExpectedEvent == actualCount) {
- break;
- }
- if (sleepTime > 0 && i < checkTimes - 1) {
- Thread.sleep(sleepTime);
- }
- }
- Assert.assertEquals("Unexpected number of " + expectedEvent +
- " event published.", numOfExpectedEvent, actualCount);
- }
-
- /**
- * Utility function to merge two String arrays to form a new String array for
- * our argumemts.
- *
- * @param args the first set of the arguments.
- * @param newArgs the second set of the arguments.
- * @return a String array consists of {args, newArgs}
- */
- private String[] mergeArgs(String[] args, String[] newArgs) {
- int length = args.length + newArgs.length;
- String[] result = new String[length];
- System.arraycopy(args, 0, result, 0, args.length);
- System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
- return result;
- }
-
- private String generateAppName(String postFix) {
- return name.getMethodName().replaceFirst("test", "")
- .concat(postFix == null? "" : "-" + postFix);
- }
-
- private String[] createArguments(String... args) {
- String[] res = mergeArgs(commonArgs, args);
- // set the application name so we can track down which command is running.
- res[commonArgs.length - 1] = generateAppName(null);
- return res;
- }
-
- private String[] createArgsWithPostFix(int index, String... args) {
- String[] res = mergeArgs(commonArgs, args);
- // set the application name so we can track down which command is running.
- res[commonArgs.length - 1] = generateAppName(String.valueOf(index));
- return res;
- }
-
- protected String getSleepCommand(int sec) {
- // Windows doesn't have a sleep command, ping -n does the trick
- return Shell.WINDOWS ? "ping -n " + (sec + 1) + " 127.0.0.1 >nul"
- : "sleep " + sec;
- }
-
- @Test
- public void testDSRestartWithPreviousRunningContainers() throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- getSleepCommand(8),
- "--master_memory",
- "512",
- "--container_memory",
- "128",
- "--keep_containers_across_application_attempts"
- );
-
- LOG.info("Initializing DS Client");
- Client client = new Client(TestDSFailedAppMaster.class.getName(),
- new Configuration(yarnCluster.getConfig()));
-
- client.init(args);
-
- LOG.info("Running DS Client");
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
- // application should succeed
- Assert.assertTrue(result);
- }
-
- /*
- * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
- * Set attempt_failures_validity_interval as 2.5 seconds. It will check
- * how many attempt failures for previous 2.5 seconds.
- * The application is expected to be successful.
- */
- @Test
- public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- getSleepCommand(8),
- "--master_memory",
- "512",
- "--container_memory",
- "128",
- "--attempt_failures_validity_interval",
- "2500"
- );
-
- LOG.info("Initializing DS Client");
- Configuration config = yarnCluster.getConfig();
- config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
- Client client = new Client(TestDSSleepingAppMaster.class.getName(),
- new Configuration(config));
-
- client.init(args);
-
- LOG.info("Running DS Client");
- boolean result = client.run();
-
- LOG.info("Client run completed. Result=" + result);
- // application should succeed
- Assert.assertTrue(result);
- }
-
- /*
- * The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
- * Set attempt_failures_validity_interval as 15 seconds. It will check
- * how many attempt failure for previous 15 seconds.
- * The application is expected to be fail.
- */
- @Test
- public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- getSleepCommand(8),
- "--master_memory",
- "512",
- "--container_memory",
- "128",
- "--attempt_failures_validity_interval",
- "15000"
- );
-
- LOG.info("Initializing DS Client");
- Configuration config = yarnCluster.getConfig();
- config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
- Client client = new Client(TestDSSleepingAppMaster.class.getName(),
- new Configuration(config));
-
- client.init(args);
-
- LOG.info("Running DS Client");
- boolean result = client.run();
-
- LOG.info("Client run completed. Result=" + result);
- // application should be failed
- Assert.assertFalse(result);
- }
-
- @Test
- public void testDSShellWithCustomLogPropertyFile() throws Exception {
- final File basedir =
- new File("target", TestDistributedShell.class.getName());
- final File tmpDir = new File(basedir, "tmpDir");
- tmpDir.mkdirs();
- final File customLogProperty = new File(tmpDir, "custom_log4j.properties");
- if (customLogProperty.exists()) {
- customLogProperty.delete();
- }
- if(!customLogProperty.createNewFile()) {
- Assert.fail("Can not create custom log4j property file.");
- }
- PrintWriter fileWriter = new PrintWriter(customLogProperty);
- // set the output to DEBUG level
- fileWriter.write("log4j.rootLogger=debug,stdout");
- fileWriter.close();
- String[] args = createArguments(
- "--num_containers",
- "3",
- "--shell_command",
- "echo",
- "--shell_args",
- "HADOOP",
- "--log_properties",
- customLogProperty.getAbsolutePath(),
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1"
- );
-
- //Before run the DS, the default the log level is INFO
- final Logger LOG_Client =
- LoggerFactory.getLogger(Client.class);
- Assert.assertTrue(LOG_Client.isInfoEnabled());
- Assert.assertFalse(LOG_Client.isDebugEnabled());
- final Logger LOG_AM = LoggerFactory.getLogger(ApplicationMaster.class);
- Assert.assertTrue(LOG_AM.isInfoEnabled());
- Assert.assertFalse(LOG_AM.isDebugEnabled());
-
- LOG.info("Initializing DS Client");
- final Client client =
- new Client(new Configuration(yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
-
- LOG.info("Running DS Client");
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
- Assert.assertTrue(verifyContainerLog(3, null, true, "DEBUG") > 10);
- //After DS is finished, the log level should be DEBUG
- Assert.assertTrue(LOG_Client.isInfoEnabled());
- Assert.assertTrue(LOG_Client.isDebugEnabled());
- Assert.assertTrue(LOG_AM.isInfoEnabled());
- Assert.assertTrue(LOG_AM.isDebugEnabled());
- }
-
- @Test
- public void testSpecifyingLogAggregationContext() throws Exception {
- String regex = ".*(foo|bar)\\d";
- String[] args = createArguments(
- "--shell_command",
- "echo",
- "--rolling_log_pattern",
- regex
- );
- final Client client =
- new Client(new Configuration(yarnCluster.getConfig()));
- Assert.assertTrue(client.init(args));
-
- ApplicationSubmissionContext context =
- Records.newRecord(ApplicationSubmissionContext.class);
- client.specifyLogAggregationContext(context);
- LogAggregationContext logContext = context.getLogAggregationContext();
- assertEquals(logContext.getRolledLogsIncludePattern(), regex);
- assertTrue(logContext.getRolledLogsExcludePattern().isEmpty());
- }
-
- public void testDSShellWithCommands() throws Exception {
-
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--shell_command",
- "\"echo output_ignored;echo output_expected\"",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1"
- );
-
- LOG.info("Initializing DS Client");
- final Client client =
- new Client(new Configuration(yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
- try {
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
- List<String> expectedContent = new ArrayList<>();
- expectedContent.add("output_expected");
- verifyContainerLog(2, expectedContent, false, "");
- } finally {
- client.sendStopSignal();
- }
- }
-
- @Test
- public void testDSShellWithMultipleArgs() throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "4",
- "--shell_command",
- "echo",
- "--shell_args",
- "HADOOP YARN MAPREDUCE HDFS",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1"
- );
-
- LOG.info("Initializing DS Client");
- final Client client =
- new Client(new Configuration(yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
-
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
- List<String> expectedContent = new ArrayList<>();
- expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
- verifyContainerLog(4, expectedContent, false, "");
- }
-
- @Test
- public void testDSShellWithShellScript() throws Exception {
- final File basedir =
- new File("target", TestDistributedShell.class.getName());
- final File tmpDir = new File(basedir, "tmpDir");
- tmpDir.mkdirs();
- final File customShellScript = new File(tmpDir, "custom_script.sh");
- if (customShellScript.exists()) {
- customShellScript.delete();
- }
- if (!customShellScript.createNewFile()) {
- Assert.fail("Can not create custom shell script file.");
- }
- PrintWriter fileWriter = new PrintWriter(customShellScript);
- // set the output to DEBUG level
- fileWriter.write("echo testDSShellWithShellScript");
- fileWriter.close();
- LOG.info(customShellScript.getAbsolutePath());
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_script",
- customShellScript.getAbsolutePath(),
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1"
- );
-
- LOG.info("Initializing DS Client");
- final Client client =
- new Client(new Configuration(yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
- List<String> expectedContent = new ArrayList<>();
- expectedContent.add("testDSShellWithShellScript");
- verifyContainerLog(1, expectedContent, false, "");
- }
-
- @Test
- public void testDSShellWithInvalidArgs() throws Exception {
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- int appNameCounter = 0;
- LOG.info("Initializing DS Client with no args");
- try {
- client.init(new String[]{});
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("No args"));
- }
-
- LOG.info("Initializing DS Client with no jar file");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--container_memory",
- "128"
- );
- String[] argsNoJar = Arrays.copyOfRange(args, 2, args.length);
- client.init(argsNoJar);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("No jar"));
- }
-
- LOG.info("Initializing DS Client with no shell command");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "2",
- "--master_memory",
- "512",
- "--container_memory",
- "128"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("No shell command"));
- }
-
- LOG.info("Initializing DS Client with invalid no. of containers");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "-1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--container_memory",
- "128"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("Invalid no. of containers"));
- }
-
- LOG.info("Initializing DS Client with invalid no. of vcores");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--master_vcores",
- "-2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1"
- );
- client.init(args);
- client.run();
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("Invalid virtual cores specified"));
- }
-
- LOG.info("Initializing DS Client with --shell_command and --shell_script");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1",
- "--shell_script",
- "test.sh"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("Can not specify shell_command option " +
- "and shell_script option at the same time"));
- }
-
- LOG.info("Initializing DS Client without --shell_command and --shell_script");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "2",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("No shell command or shell script specified " +
- "to be executed by application master"));
- }
-
- LOG.info("Initializing DS Client with invalid container_type argument");
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "2",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1",
- "--shell_command",
- "date",
- "--container_type",
- "UNSUPPORTED_TYPE"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- Assert.assertTrue("The throw exception is not expected",
- e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE"));
- }
-
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_resources",
- "memory-mb=invalid"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (IllegalArgumentException e) {
- // do nothing
- LOG.info("IllegalArgumentException exception is expected: {}",
- e.getMessage());
- }
-
- try {
- String[] args = createArgsWithPostFix(appNameCounter++,
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_resources"
- );
- client.init(args);
- Assert.fail("Exception is expected");
- } catch (MissingArgumentException e) {
- // do nothing
- LOG.info("MissingArgumentException exception is expected: {}",
- e.getMessage());
- }
- }
-
- @Test
- public void testDSTimelineClientWithConnectionRefuse() throws Exception {
- ApplicationMaster am = new ApplicationMaster();
-
- TimelineClientImpl client = new TimelineClientImpl() {
- @Override
- protected TimelineWriter createTimelineWriter(Configuration conf,
- UserGroupInformation authUgi, com.sun.jersey.api.client.Client client,
- URI resURI) throws IOException {
- TimelineWriter timelineWriter =
- new DirectTimelineWriter(authUgi, client, resURI);
- spyTimelineWriter = spy(timelineWriter);
- return spyTimelineWriter;
- }
- };
- client.init(conf);
- client.start();
- TestTimelineClient.mockEntityClientResponse(spyTimelineWriter, null,
- false, true);
- try {
- UserGroupInformation ugi = mock(UserGroupInformation.class);
- when(ugi.getShortUserName()).thenReturn("user1");
- // verify no ClientHandlerException get thrown out.
- am.publishContainerEndEvent(client, ContainerStatus.newInstance(
- BuilderUtils.newContainerId(1, 1, 1, 1), ContainerState.COMPLETE, "",
- 1), "domainId", ugi);
- } finally {
- client.stop();
- }
- }
-
- protected void waitForNMsToRegister() throws Exception {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
- return (rmContext.getRMNodes().size() >= NUM_NMS);
- }
- }, 100, 60000);
- }
-
- @Test
- public void testContainerLaunchFailureHandling() throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--container_memory",
- "128"
- );
-
- LOG.info("Initializing DS Client");
- Client client = new Client(ContainerLaunchFailAppMaster.class.getName(),
- new Configuration(yarnCluster.getConfig()));
- boolean initSuccess = client.init(args);
- Assert.assertTrue(initSuccess);
- LOG.info("Running DS Client");
- try {
- boolean result = client.run();
- Assert.assertFalse(result);
- } finally {
- client.sendStopSignal();
- }
- }
-
- @Test
- public void testDebugFlag() throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1",
- "--debug"
- );
-
- LOG.info("Initializing DS Client");
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- Assert.assertTrue(client.init(args));
- LOG.info("Running DS Client");
- Assert.assertTrue(client.run());
- }
-
- private int verifyContainerLog(int containerNum,
- List<String> expectedContent, boolean count, String expectedWord) {
- File logFolder =
- new File(yarnCluster.getNodeManager(0).getConfig()
- .get(YarnConfiguration.NM_LOG_DIRS,
- YarnConfiguration.DEFAULT_NM_LOG_DIRS));
-
- File[] listOfFiles = logFolder.listFiles();
- int currentContainerLogFileIndex = -1;
- for (int i = listOfFiles.length - 1; i >= 0; i--) {
- if (listOfFiles[i].listFiles().length == containerNum + 1) {
- currentContainerLogFileIndex = i;
- break;
- }
- }
- Assert.assertTrue(currentContainerLogFileIndex != -1);
- File[] containerFiles =
- listOfFiles[currentContainerLogFileIndex].listFiles();
-
- int numOfWords = 0;
- for (int i = 0; i < containerFiles.length; i++) {
- for (File output : containerFiles[i].listFiles()) {
- if (output.getName().trim().contains("stdout")) {
- BufferedReader br = null;
- List<String> stdOutContent = new ArrayList<>();
- try {
-
- String sCurrentLine;
- br = new BufferedReader(new FileReader(output));
- int numOfline = 0;
- while ((sCurrentLine = br.readLine()) != null) {
- if (count) {
- if (sCurrentLine.contains(expectedWord)) {
- numOfWords++;
- }
- } else if (output.getName().trim().equals("stdout")){
- if (! Shell.WINDOWS) {
- Assert.assertEquals("The current is" + sCurrentLine,
- expectedContent.get(numOfline), sCurrentLine.trim());
- numOfline++;
- } else {
- stdOutContent.add(sCurrentLine.trim());
- }
- }
- }
- /* By executing bat script using cmd /c,
- * it will output all contents from bat script first
- * It is hard for us to do check line by line
- * Simply check whether output from bat file contains
- * all the expected messages
- */
- if (Shell.WINDOWS && !count
- && output.getName().trim().equals("stdout")) {
- Assert.assertTrue(stdOutContent.containsAll(expectedContent));
- }
- } catch (IOException e) {
- LOG.error("Exception reading the buffer", e);
- } finally {
- try {
- if (br != null)
- br.close();
- } catch (IOException ex) {
- LOG.error("Exception closing the bufferReader", ex);
- }
- }
- }
- }
- }
- return numOfWords;
- }
-
- @Test
- public void testDistributedShellResourceProfiles() throws Exception {
- int appNameCounter = 0;
- String[][] args = {
- createArgsWithPostFix(appNameCounter++,
- "--num_containers", "1", "--shell_command",
- Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
- "maximum"),
- createArgsWithPostFix(appNameCounter++,
- "--num_containers", "1", "--shell_command",
- Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
- "default"),
- createArgsWithPostFix(appNameCounter++,
- "--num_containers", "1", "--shell_command",
- Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
- "default", "--container_resource_profile", "maximum"),
- };
-
- for (int i = 0; i < args.length; ++i) {
- LOG.info("Initializing DS Client");
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- Assert.assertTrue(client.init(args[i]));
- LOG.info("Running DS Client");
- try {
- client.run();
- Assert.fail("Client run should throw error");
- } catch (Exception e) {
- continue;
- }
- }
- }
-
- @Test
- public void testDSShellWithOpportunisticContainers() throws Exception {
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- try {
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1",
- "--shell_command",
- "date",
- "--container_type",
- "OPPORTUNISTIC"
- );
- client.init(args);
- assertTrue(client.run());
- } catch (Exception e) {
- LOG.error("Job execution with opportunistic containers failed.", e);
- Assert.fail("Exception. " + e.getMessage());
- } finally {
- client.sendStopSignal();
- }
- }
-
- @Test
- @TimelineVersion(2.0f)
- public void testDSShellWithEnforceExecutionType() throws Exception {
- YarnClient yarnClient = null;
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- try {
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--master_memory",
- "512",
- "--master_vcores",
- "2",
- "--container_memory",
- "128",
- "--container_vcores",
- "1",
- "--shell_command",
- "date",
- "--container_type",
- "OPPORTUNISTIC",
- "--enforce_execution_type"
- );
- client.init(args);
- final AtomicBoolean result = new AtomicBoolean(false);
- Thread t = new Thread() {
- public void run() {
- try {
- result.set(client.run());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- t.start();
-
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(new Configuration(yarnCluster.getConfig()));
- yarnClient.start();
- waitForContainersLaunch(yarnClient, 2);
- List<ApplicationReport> apps = yarnClient.getApplications();
- ApplicationReport appReport = apps.get(0);
- ApplicationId appId = appReport.getApplicationId();
- List<ApplicationAttemptReport> appAttempts =
- yarnClient.getApplicationAttempts(appId);
- ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
- ApplicationAttemptId appAttemptId =
- appAttemptReport.getApplicationAttemptId();
- List<ContainerReport> containers =
- yarnClient.getContainers(appAttemptId);
- // we should get two containers.
- Assert.assertEquals(2, containers.size());
- ContainerId amContainerId = appAttemptReport.getAMContainerId();
- for (ContainerReport container : containers) {
- if (!container.getContainerId().equals(amContainerId)) {
- Assert.assertEquals(container.getExecutionType(),
- ExecutionType.OPPORTUNISTIC);
- }
- }
- } catch (Exception e) {
- LOG.error("Job execution with enforce execution type failed.", e);
- Assert.fail("Exception. " + e.getMessage());
- } finally {
- client.sendStopSignal();
- if (yarnClient != null) {
- yarnClient.stop();
- }
- }
- }
-
- private void waitForContainersLaunch(YarnClient client,
- int nContainers) throws Exception {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- public Boolean get() {
- try {
- List<ApplicationReport> apps = client.getApplications();
- if (apps == null || apps.isEmpty()) {
- return false;
- }
- ApplicationId appId = apps.get(0).getApplicationId();
- List<ApplicationAttemptReport> appAttempts =
- client.getApplicationAttempts(appId);
- if (appAttempts == null || appAttempts.isEmpty()) {
- return false;
- }
- ApplicationAttemptId attemptId =
- appAttempts.get(0).getApplicationAttemptId();
- List<ContainerReport> containers = client.getContainers(attemptId);
- return (containers.size() == nContainers);
- } catch (Exception e) {
- return false;
- }
- }
- }, 10, 60000);
- }
-
- @Test
- @TimelineVersion(2.0f)
- public void testDistributedShellWithResources() throws Exception {
- doTestDistributedShellWithResources(false);
- }
-
- @Test
- @TimelineVersion(2.0f)
- public void testDistributedShellWithResourcesWithLargeContainers()
- throws Exception {
- doTestDistributedShellWithResources(true);
- }
-
- public void doTestDistributedShellWithResources(boolean largeContainers)
- throws Exception {
- Resource clusterResource = yarnCluster.getResourceManager()
- .getResourceScheduler().getClusterResource();
- String masterMemoryString = "1 Gi";
- String containerMemoryString = "512 Mi";
- long[] memVars = {1024, 512};
-
- Assume.assumeTrue("The cluster doesn't have enough memory for this test",
- clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
- Assume.assumeTrue("The cluster doesn't have enough cores for this test",
- clusterResource.getVirtualCores() >= 2);
- if (largeContainers) {
- memVars[0] = clusterResource.getMemorySize() * 2 / 3;
- memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
- masterMemoryString = memVars[0] + "Mi";
- memVars[1] = clusterResource.getMemorySize() / 3;
- memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
- containerMemoryString = String.valueOf(memVars[1]);
- }
-
- String[] args = createArguments(
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_resources",
- "memory=" + masterMemoryString + ",vcores=1",
- "--container_resources",
- "memory=" + containerMemoryString + ",vcores=1"
- );
-
- LOG.info("Initializing DS Client");
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- Assert.assertTrue(client.init(args));
- LOG.info("Running DS Client");
- final AtomicBoolean result = new AtomicBoolean(false);
- Thread t = new Thread() {
- public void run() {
- try {
- result.set(client.run());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- };
- t.start();
-
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(new Configuration(yarnCluster.getConfig()));
- yarnClient.start();
-
- final AtomicBoolean testFailed = new AtomicBoolean(false);
- try {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- if (testFailed.get()) {
- return true;
- }
- List<ContainerReport> containers;
- try {
- List<ApplicationReport> apps = yarnClient.getApplications();
- if (apps.isEmpty()) {
- return false;
- }
- ApplicationReport appReport = apps.get(0);
- ApplicationId appId = appReport.getApplicationId();
- List<ApplicationAttemptReport> appAttempts =
- yarnClient.getApplicationAttempts(appId);
- if (appAttempts.isEmpty()) {
- return false;
- }
- ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
- ContainerId amContainerId = appAttemptReport.getAMContainerId();
- if (amContainerId == null) {
- return false;
- }
- ContainerReport report = yarnClient.getContainerReport(
- amContainerId);
- Resource masterResource = report.getAllocatedResource();
- Assert.assertEquals(memVars[0],
- masterResource.getMemorySize());
- Assert.assertEquals(1, masterResource.getVirtualCores());
- containers = yarnClient.getContainers(
- appAttemptReport.getApplicationAttemptId());
- if (containers.size() < 2) {
- return false;
- }
- for (ContainerReport container : containers) {
- if (!container.getContainerId().equals(amContainerId)) {
- Resource containerResource = container.getAllocatedResource();
- Assert.assertEquals(memVars[1],
- containerResource.getMemorySize());
- Assert.assertEquals(1, containerResource.getVirtualCores());
- }
- }
- return true;
- } catch (Exception ex) {
- LOG.error("Error waiting for expected results", ex);
- testFailed.set(true);
- }
- return false;
- }
- }, 10, TEST_TIME_WINDOW_EXPIRE);
- assertFalse(testFailed.get());
- } finally {
- LOG.info("Signaling Client to Stop");
- client.sendStopSignal();
- if (yarnClient != null) {
- LOG.info("Stopping yarnClient service");
- yarnClient.stop();
- }
- }
- }
-
- @Test(expected=ResourceNotFoundException.class)
- public void testDistributedShellAMResourcesWithUnknownResource()
- throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_resources",
- "unknown-resource=5"
- );
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- client.run();
- }
-
- @Test(expected=IllegalArgumentException.class)
- public void testDistributedShellNonExistentQueue()
- throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--queue",
- "non-existent-queue"
- );
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- client.run();
- }
-
- @Test
- public void testDistributedShellWithSingleFileLocalization()
- throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "type" : "cat",
- "--localize_files",
- "./src/test/resources/a.txt",
- "--shell_args",
- "a.txt"
- );
-
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- assertTrue("Client exited with an error", client.run());
- }
-
- @Test
- public void testDistributedShellWithMultiFileLocalization()
- throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "type" : "cat",
- "--localize_files",
- "./src/test/resources/a.txt,./src/test/resources/b.txt",
- "--shell_args",
- "a.txt b.txt"
- );
-
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- assertTrue("Client exited with an error", client.run());
- }
-
- @Test(expected=UncheckedIOException.class)
- public void testDistributedShellWithNonExistentFileLocalization()
- throws Exception {
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "type" : "cat",
- "--localize_files",
- "/non/existing/path/file.txt",
- "--shell_args",
- "file.txt"
- );
-
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- assertTrue(client.run());
- }
-
-
- @Test
- public void testDistributedShellCleanup()
- throws Exception {
- String appName = "DistributedShellCleanup";
- String[] args = createArguments(
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls"
- );
- Configuration config = new Configuration(yarnCluster.getConfig());
- Client client = new Client(config);
- try {
- client.init(args);
- client.run();
- ApplicationId appId = client.getAppId();
- String relativePath =
- ApplicationMaster.getRelativePath(appName, appId.toString(), "");
- FileSystem fs1 = FileSystem.get(config);
- Path path = new Path(fs1.getHomeDirectory(), relativePath);
-
- GenericTestUtils.waitFor(() -> {
- try {
- return !fs1.exists(path);
- } catch (IOException e) {
- return false;
- }
- }, 10, 60000);
-
- assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
- } finally {
- client.sendStopSignal();
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org