You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2020/12/28 19:09:38 UTC
[hadoop] branch trunk updated: YARN-10334. Close clients in
TestDistributedShell (#2571)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 513f199 YARN-10334. Close clients in TestDistributedShell (#2571)
513f199 is described below
commit 513f1995adc9b73f9c7f4c7beb89725b51b313ac
Author: Ahmed Hussein <50...@users.noreply.github.com>
AuthorDate: Mon Dec 28 14:09:10 2020 -0500
YARN-10334. Close clients in TestDistributedShell (#2571)
---
.../yarn/applications/distributedshell/Client.java | 49 +-
.../distributedshell/TestDistributedShell.java | 784 ++++++++++-----------
2 files changed, 434 insertions(+), 399 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
index d7114d0..5da4384 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
@@ -32,6 +32,7 @@ import java.util.Set;
import java.util.Vector;
import java.util.Arrays;
import java.util.Base64;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
@@ -253,6 +254,10 @@ public class Client {
// Command line options
private Options opts;
+ private final AtomicBoolean stopSignalReceived;
+ private final AtomicBoolean isRunning;
+ private final Object objectLock = new Object();
+
private static final String shellCommandPath = "shellCommands";
private static final String shellArgsPath = "shellArgs";
private static final String appMasterJarPath = "AppMaster.jar";
@@ -413,6 +418,8 @@ public class Client {
opts.addOption("application_tags", true, "Application tags.");
opts.addOption("localize_files", true, "List of files, separated by comma"
+ " to be localized for the command");
+ stopSignalReceived = new AtomicBoolean(false);
+ isRunning = new AtomicBoolean(false);
}
/**
@@ -670,8 +677,8 @@ public class Client {
* @throws YarnException
*/
public boolean run() throws IOException, YarnException {
-
LOG.info("Running Client");
+ isRunning.set(true);
yarnClient.start();
// set the client start time.
clientStartTime = System.currentTimeMillis();
@@ -1116,15 +1123,22 @@ public class Client {
boolean res = false;
boolean needForceKill = false;
- while (true) {
+ while (isRunning.get()) {
// Check app status every 1 second.
try {
- Thread.sleep(APP_MONITOR_INTERVAL);
+ synchronized (objectLock) {
+ objectLock.wait(APP_MONITOR_INTERVAL);
+ }
+ needForceKill = stopSignalReceived.get();
} catch (InterruptedException e) {
LOG.warn("Thread sleep in monitoring loop interrupted");
// if the application is to be killed when client times out;
// then set needForceKill to true
break;
+ } finally {
+ if (needForceKill) {
+ break;
+ }
}
// Get application report for the appId we are interested in
@@ -1177,6 +1191,8 @@ public class Client {
forceKillApplication(appId);
}
+ isRunning.set(false);
+
return res;
}
@@ -1388,4 +1404,31 @@ public class Client {
}
return resources;
}
+
+ @VisibleForTesting
+ protected void sendStopSignal() {
+ LOG.info("Sending stop Signal to Client");
+ stopSignalReceived.set(true);
+ synchronized (objectLock) {
+ objectLock.notifyAll();
+ }
+ int waitCount = 0;
+ LOG.info("Waiting for Client to exit loop");
+ while (!isRunning.get()) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ie) {
+ // do nothing
+ } finally {
+ waitCount++;
+ if (isRunning.get() || waitCount > 2000) {
+ break;
+ }
+ }
+ }
+ LOG.info("Stopping yarnClient within the Client");
+ yarnClient.stop();
+ yarnClient.waitForServiceToStop(clientTimeout);
+ LOG.info("done stopping Client");
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
index 438b12b..009ef3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
@@ -38,6 +39,7 @@ import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
@@ -88,6 +90,7 @@ import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
import org.apache.hadoop.yarn.server.timeline.TimelineVersion;
@@ -128,6 +131,9 @@ public class TestDistributedShell {
private static final float DEFAULT_TIMELINE_VERSION = 1.0f;
private static final String TIMELINE_AUX_SERVICE_NAME = "timeline_collector";
private static final int MIN_ALLOCATION_MB = 128;
+ private static final int TEST_TIME_OUT = 150000;
+ // set the timeout of the yarnClient to be 95% of the globalTimeout.
+ private static final int TEST_TIME_WINDOW_EXPIRE = (TEST_TIME_OUT * 90) / 100;
protected final static String APPMASTER_JAR =
JarFinder.getJar(ApplicationMaster.class);
@@ -135,17 +141,29 @@ public class TestDistributedShell {
@Rule
public TimelineVersionWatcher timelineVersionWatcher
= new TimelineVersionWatcher();
+
@Rule
- public Timeout globalTimeout = new Timeout(90000);
+ public Timeout globalTimeout = new Timeout(TEST_TIME_OUT,
+ TimeUnit.MILLISECONDS);
+
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
@Rule
public TestName name = new TestName();
- private String generateAppName() {
- return name.getMethodName().replaceFirst("test", "");
- }
+ // set the timeout of the yarnClient to be 95% of the globalTimeout.
+ private final String yarnClientTimeout =
+ String.valueOf(TEST_TIME_WINDOW_EXPIRE);
+
+ private final String[] commonArgs = {
+ "--jar",
+ APPMASTER_JAR,
+ "--timeout",
+ yarnClientTimeout,
+ "--appname",
+ ""
+ };
@Before
public void setup() throws Exception {
@@ -168,6 +186,7 @@ public class TestDistributedShell {
MIN_ALLOCATION_MB);
// reduce the teardown waiting time
conf.setLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 1000);
+ conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 500);
conf.set("yarn.log.dir", "target");
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
// mark if we need to launch the v1 timeline server
@@ -201,11 +220,10 @@ public class TestDistributedShell {
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT);
} else if (timelineVersion == 1.5f) {
- if (hdfsCluster == null) {
- HdfsConfiguration hdfsConfig = new HdfsConfiguration();
- hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
- .numDataNodes(1).build();
- }
+ HdfsConfiguration hdfsConfig = new HdfsConfiguration();
+ hdfsCluster = new MiniDFSCluster.Builder(hdfsConfig)
+ .numDataNodes(1).build();
+ hdfsCluster.waitActive();
fs = hdfsCluster.getFileSystem();
PluginStoreTestUtils.prepareFileSystemForPluginStore(fs);
PluginStoreTestUtils.prepareConfiguration(conf, hdfsCluster);
@@ -231,39 +249,39 @@ public class TestDistributedShell {
} else {
Assert.fail("Wrong timeline version number: " + timelineVersion);
}
-
- if (yarnCluster == null) {
- yarnCluster =
- new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
- numNodeManager, 1, 1);
- yarnCluster.init(conf);
-
- yarnCluster.start();
-
- conf.set(
- YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
- MiniYARNCluster.getHostname() + ":"
- + yarnCluster.getApplicationHistoryServer().getPort());
-
- waitForNMsToRegister();
-
- URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
- if (url == null) {
- throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
- }
- Configuration yarnClusterConfig = yarnCluster.getConfig();
- yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- new File(url.getPath()).getParent());
- //write the document to a buffer (not directly to the file, as that
- //can cause the file being written to get read -which will then fail.
- ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
- yarnClusterConfig.writeXml(bytesOut);
- bytesOut.close();
- //write the bytes to the file in the classpath
- OutputStream os = new FileOutputStream(new File(url.getPath()));
- os.write(bytesOut.toByteArray());
- os.close();
+
+ yarnCluster =
+ new MiniYARNCluster(TestDistributedShell.class.getSimpleName(), 1,
+ numNodeManager, 1, 1);
+ yarnCluster.init(conf);
+ yarnCluster.start();
+
+ conf.set(
+ YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+ MiniYARNCluster.getHostname() + ":"
+ + yarnCluster.getApplicationHistoryServer().getPort());
+
+ waitForNMsToRegister();
+
+ URL url = Thread.currentThread().getContextClassLoader().getResource(
+ "yarn-site.xml");
+ if (url == null) {
+ throw new RuntimeException(
+ "Could not find 'yarn-site.xml' dummy file in classpath");
}
+ Configuration yarnClusterConfig = yarnCluster.getConfig();
+ yarnClusterConfig.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ new File(url.getPath()).getParent());
+ //write the document to a buffer (not directly to the file, as that
+ //can cause the file being written to get read -which will then fail.
+ ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
+ yarnClusterConfig.writeXml(bytesOut);
+ bytesOut.close();
+ //write the bytes to the file in the classpath
+ OutputStream os = new FileOutputStream(url.getPath());
+ os.write(bytesOut.toByteArray());
+ os.close();
+
FileContext fsContext = FileContext.getLocalFSFileContext();
fsContext
.delete(
@@ -278,6 +296,11 @@ public class TestDistributedShell {
@After
public void tearDown() throws IOException {
+ FileContext fsContext = FileContext.getLocalFSFileContext();
+ fsContext
+ .delete(
+ new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
+ true);
if (yarnCluster != null) {
try {
yarnCluster.stop();
@@ -292,11 +315,6 @@ public class TestDistributedShell {
hdfsCluster = null;
}
}
- FileContext fsContext = FileContext.getLocalFSFileContext();
- fsContext
- .delete(
- new Path(conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH)),
- true);
}
@Test
@@ -345,9 +363,7 @@ public class TestDistributedShell {
public void testDSShell(boolean haveDomain, boolean defaultFlow)
throws Exception {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"2",
"--shell_command",
@@ -359,8 +375,8 @@ public class TestDistributedShell {
"--container_memory",
"128",
"--container_vcores",
- "1"
- };
+ "1");
+
if (haveDomain) {
String[] domainArgs = {
"--domain",
@@ -391,6 +407,7 @@ public class TestDistributedShell {
}
LOG.info("Initializing DS Client");
+ YarnClient yarnClient;
final Client client = new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
@@ -407,7 +424,7 @@ public class TestDistributedShell {
};
t.start();
- YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
@@ -415,15 +432,15 @@ public class TestDistributedShell {
String errorMessage = "";
ApplicationId appId = null;
ApplicationReport appReport = null;
- while(!verified) {
+ while (!verified) {
List<ApplicationReport> apps = yarnClient.getApplications();
- if (apps.size() == 0 ) {
+ if (apps.size() == 0) {
Thread.sleep(10);
continue;
}
appReport = apps.get(0);
appId = appReport.getApplicationId();
- if(appReport.getHost().equals("N/A")) {
+ if (appReport.getHost().equals("N/A")) {
Thread.sleep(10);
continue;
}
@@ -436,7 +453,7 @@ public class TestDistributedShell {
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED
&& appReport.getFinalApplicationStatus() !=
- FinalApplicationStatus.UNDEFINED) {
+ FinalApplicationStatus.UNDEFINED) {
break;
}
}
@@ -463,11 +480,10 @@ public class TestDistributedShell {
}
}
- TimelineDomain domain = null;
if (!isTestingTimelineV2) {
checkTimelineV1(haveDomain);
} else {
- checkTimelineV2(haveDomain, appId, defaultFlow, appReport);
+ checkTimelineV2(appId, defaultFlow, appReport);
}
}
@@ -489,8 +505,8 @@ public class TestDistributedShell {
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
.size());
- Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType()
- .toString(), ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
+ Assert.assertEquals(entitiesAttempts.getEntities().get(0).getEntityType(),
+ ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString());
if (haveDomain) {
Assert.assertEquals(domain.getId(),
entitiesAttempts.getEntities().get(0).getDomainId());
@@ -512,8 +528,8 @@ public class TestDistributedShell {
null, null, null, null, primaryFilter, null, null, null);
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
- Assert.assertEquals(entities.getEntities().get(0).getEntityType()
- .toString(), ApplicationMaster.DSEntity.DS_CONTAINER.toString());
+ Assert.assertEquals(entities.getEntities().get(0).getEntityType(),
+ ApplicationMaster.DSEntity.DS_CONTAINER.toString());
String entityId = entities.getEntities().get(0).getEntityId();
org.apache.hadoop.yarn.api.records.timeline.TimelineEntity entity =
@@ -532,7 +548,7 @@ public class TestDistributedShell {
}
}
- private void checkTimelineV2(boolean haveDomain, ApplicationId appId,
+ private void checkTimelineV2(ApplicationId appId,
boolean defaultFlow, ApplicationReport appReport) throws Exception {
LOG.info("Started checkTimelineV2 ");
// For PoC check using the file-based timeline writer (YARN-3264)
@@ -635,7 +651,13 @@ public class TestDistributedShell {
verifyEntityForTimelineV2(appAttemptEntityFile,
AppAttemptMetricsConstants.FINISHED_EVENT_TYPE, 1, 1, 0, true);
} finally {
- FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+ try {
+ FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
+ } catch (FileNotFoundException ex) {
+ // the recursive delete can throw an exception when one of the file
+ // does not exist.
+ LOG.warn("Exception deleting a file/subDirectory: {}", ex.getMessage());
+ }
}
}
@@ -673,7 +695,7 @@ public class TestDistributedShell {
long actualCount = 0;
for (int i = 0; i < checkTimes; i++) {
BufferedReader reader = null;
- String strLine = null;
+ String strLine;
actualCount = 0;
try {
reader = new BufferedReader(new FileReader(entityFile));
@@ -710,7 +732,9 @@ public class TestDistributedShell {
}
}
} finally {
- reader.close();
+ if (reader != null) {
+ reader.close();
+ }
}
if (numOfExpectedEvent == actualCount) {
break;
@@ -727,14 +751,35 @@ public class TestDistributedShell {
* Utility function to merge two String arrays to form a new String array for
* our argumemts.
*
- * @param args
- * @param newArgs
+ * @param args the first set of the arguments.
+ * @param newArgs the second set of the arguments.
* @return a String array consists of {args, newArgs}
*/
private String[] mergeArgs(String[] args, String[] newArgs) {
- List<String> argsList = new ArrayList<String>(Arrays.asList(args));
- argsList.addAll(Arrays.asList(newArgs));
- return argsList.toArray(new String[argsList.size()]);
+ int length = args.length + newArgs.length;
+ String[] result = new String[length];
+ System.arraycopy(args, 0, result, 0, args.length);
+ System.arraycopy(newArgs, 0, result, args.length, newArgs.length);
+ return result;
+ }
+
+ private String generateAppName(String postFix) {
+ return name.getMethodName().replaceFirst("test", "")
+ .concat(postFix == null? "" : "-" + postFix);
+ }
+
+ private String[] createArguments(String... args) {
+ String[] res = mergeArgs(commonArgs, args);
+ // set the application name so we can track down which command is running.
+ res[commonArgs.length - 1] = generateAppName(null);
+ return res;
+ }
+
+ private String[] createArgsWithPostFix(int index, String... args) {
+ String[] res = mergeArgs(commonArgs, args);
+ // set the application name so we can track down which command is running.
+ res[commonArgs.length - 1] = generateAppName(String.valueOf(index));
+ return res;
}
protected String getSleepCommand(int sec) {
@@ -745,11 +790,7 @@ public class TestDistributedShell {
@Test
public void testDSRestartWithPreviousRunningContainers() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
@@ -759,20 +800,20 @@ public class TestDistributedShell {
"--container_memory",
"128",
"--keep_containers_across_application_attempts"
- };
+ );
- LOG.info("Initializing DS Client");
- Client client = new Client(TestDSFailedAppMaster.class.getName(),
+ LOG.info("Initializing DS Client");
+ Client client = new Client(TestDSFailedAppMaster.class.getName(),
new Configuration(yarnCluster.getConfig()));
- client.init(args);
- LOG.info("Running DS Client");
- boolean result = client.run();
+ client.init(args);
- LOG.info("Client run completed. Result=" + result);
- // application should succeed
- Assert.assertTrue(result);
- }
+ LOG.info("Running DS Client");
+ boolean result = client.run();
+ LOG.info("Client run completed. Result=" + result);
+ // application should succeed
+ Assert.assertTrue(result);
+ }
/*
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
@@ -782,11 +823,7 @@ public class TestDistributedShell {
*/
@Test
public void testDSAttemptFailuresValidityIntervalSucess() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
@@ -797,22 +834,23 @@ public class TestDistributedShell {
"128",
"--attempt_failures_validity_interval",
"2500"
- };
+ );
- LOG.info("Initializing DS Client");
- Configuration conf = yarnCluster.getConfig();
- conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
- Client client = new Client(TestDSSleepingAppMaster.class.getName(),
- new Configuration(conf));
+ LOG.info("Initializing DS Client");
+ Configuration config = yarnCluster.getConfig();
+ config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ Client client = new Client(TestDSSleepingAppMaster.class.getName(),
+ new Configuration(config));
- client.init(args);
- LOG.info("Running DS Client");
- boolean result = client.run();
+ client.init(args);
- LOG.info("Client run completed. Result=" + result);
- // application should succeed
- Assert.assertTrue(result);
- }
+ LOG.info("Running DS Client");
+ boolean result = client.run();
+
+ LOG.info("Client run completed. Result=" + result);
+ // application should succeed
+ Assert.assertTrue(result);
+ }
/*
* The sleeping period in TestDSSleepingAppMaster is set as 5 seconds.
@@ -822,11 +860,7 @@ public class TestDistributedShell {
*/
@Test
public void testDSAttemptFailuresValidityIntervalFailed() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
@@ -837,22 +871,23 @@ public class TestDistributedShell {
"128",
"--attempt_failures_validity_interval",
"15000"
- };
+ );
- LOG.info("Initializing DS Client");
- Configuration conf = yarnCluster.getConfig();
- conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
- Client client = new Client(TestDSSleepingAppMaster.class.getName(),
- new Configuration(conf));
+ LOG.info("Initializing DS Client");
+ Configuration config = yarnCluster.getConfig();
+ config.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
+ Client client = new Client(TestDSSleepingAppMaster.class.getName(),
+ new Configuration(config));
- client.init(args);
- LOG.info("Running DS Client");
- boolean result = client.run();
+ client.init(args);
- LOG.info("Client run completed. Result=" + result);
- // application should be failed
- Assert.assertFalse(result);
- }
+ LOG.info("Running DS Client");
+ boolean result = client.run();
+
+ LOG.info("Client run completed. Result=" + result);
+ // application should be failed
+ Assert.assertFalse(result);
+ }
@Test
public void testDSShellWithCustomLogPropertyFile() throws Exception {
@@ -871,11 +906,7 @@ public class TestDistributedShell {
// set the output to DEBUG level
fileWriter.write("log4j.rootLogger=debug,stdout");
fileWriter.close();
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"3",
"--shell_command",
@@ -892,7 +923,7 @@ public class TestDistributedShell {
"128",
"--container_vcores",
"1"
- };
+ );
//Before run the DS, the default the log level is INFO
final Logger LOG_Client =
@@ -908,6 +939,7 @@ public class TestDistributedShell {
new Client(new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
+
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
@@ -922,16 +954,12 @@ public class TestDistributedShell {
@Test
public void testSpecifyingLogAggregationContext() throws Exception {
String regex = ".*(foo|bar)\\d";
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--shell_command",
"echo",
"--rolling_log_pattern",
regex
- };
+ );
final Client client =
new Client(new Configuration(yarnCluster.getConfig()));
Assert.assertTrue(client.init(args));
@@ -946,11 +974,7 @@ public class TestDistributedShell {
public void testDSShellWithCommands() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"2",
"--shell_command",
@@ -963,7 +987,7 @@ public class TestDistributedShell {
"128",
"--container_vcores",
"1"
- };
+ );
LOG.info("Initializing DS Client");
final Client client =
@@ -971,20 +995,20 @@ public class TestDistributedShell {
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
- boolean result = client.run();
- LOG.info("Client run completed. Result=" + result);
- List<String> expectedContent = new ArrayList<String>();
- expectedContent.add("output_expected");
- verifyContainerLog(2, expectedContent, false, "");
+ try {
+ boolean result = client.run();
+ LOG.info("Client run completed. Result=" + result);
+ List<String> expectedContent = new ArrayList<>();
+ expectedContent.add("output_expected");
+ verifyContainerLog(2, expectedContent, false, "");
+ } finally {
+ client.sendStopSignal();
+ }
}
@Test
public void testDSShellWithMultipleArgs() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"4",
"--shell_command",
@@ -999,7 +1023,7 @@ public class TestDistributedShell {
"128",
"--container_vcores",
"1"
- };
+ );
LOG.info("Initializing DS Client");
final Client client =
@@ -1007,9 +1031,10 @@ public class TestDistributedShell {
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
+
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
- List<String> expectedContent = new ArrayList<String>();
+ List<String> expectedContent = new ArrayList<>();
expectedContent.add("HADOOP YARN MAPREDUCE HDFS");
verifyContainerLog(4, expectedContent, false, "");
}
@@ -1031,12 +1056,8 @@ public class TestDistributedShell {
// set the output to DEBUG level
fileWriter.write("echo testDSShellWithShellScript");
fileWriter.close();
- System.out.println(customShellScript.getAbsolutePath());
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ LOG.info(customShellScript.getAbsolutePath());
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_script",
@@ -1049,7 +1070,7 @@ public class TestDistributedShell {
"128",
"--container_vcores",
"1"
- };
+ );
LOG.info("Initializing DS Client");
final Client client =
@@ -1059,7 +1080,7 @@ public class TestDistributedShell {
LOG.info("Running DS Client");
boolean result = client.run();
LOG.info("Client run completed. Result=" + result);
- List<String> expectedContent = new ArrayList<String>();
+ List<String> expectedContent = new ArrayList<>();
expectedContent.add("testDSShellWithShellScript");
verifyContainerLog(1, expectedContent, false, "");
}
@@ -1067,7 +1088,7 @@ public class TestDistributedShell {
@Test
public void testDSShellWithInvalidArgs() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
-
+ int appNameCounter = 0;
LOG.info("Initializing DS Client with no args");
try {
client.init(new String[]{});
@@ -1079,9 +1100,7 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client with no jar file");
try {
- String[] args = {
- "--appname",
- generateAppName(),
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--shell_command",
@@ -1090,8 +1109,9 @@ public class TestDistributedShell {
"512",
"--container_memory",
"128"
- };
- client.init(args);
+ );
+ String[] argsNoJar = Arrays.copyOfRange(args, 2, args.length);
+ client.init(argsNoJar);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
@@ -1100,16 +1120,14 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client with no shell command");
try {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--master_memory",
"512",
"--container_memory",
"128"
- };
+ );
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
@@ -1119,9 +1137,7 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client with invalid no. of containers");
try {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"-1",
"--shell_command",
@@ -1130,7 +1146,7 @@ public class TestDistributedShell {
"512",
"--container_memory",
"128"
- };
+ );
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
@@ -1140,9 +1156,7 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client with invalid no. of vcores");
try {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--shell_command",
@@ -1155,7 +1169,7 @@ public class TestDistributedShell {
"128",
"--container_vcores",
"1"
- };
+ );
client.init(args);
client.run();
Assert.fail("Exception is expected");
@@ -1166,9 +1180,7 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client with --shell_command and --shell_script");
try {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--shell_command",
@@ -1183,7 +1195,7 @@ public class TestDistributedShell {
"1",
"--shell_script",
"test.sh"
- };
+ );
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
@@ -1194,9 +1206,7 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client without --shell_command and --shell_script");
try {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--master_memory",
@@ -1207,7 +1217,7 @@ public class TestDistributedShell {
"128",
"--container_vcores",
"1"
- };
+ );
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
@@ -1218,9 +1228,7 @@ public class TestDistributedShell {
LOG.info("Initializing DS Client with invalid container_type argument");
try {
- String[] args = {
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArgsWithPostFix(appNameCounter++,
"--num_containers",
"2",
"--master_memory",
@@ -1235,13 +1243,46 @@ public class TestDistributedShell {
"date",
"--container_type",
"UNSUPPORTED_TYPE"
- };
+ );
client.init(args);
Assert.fail("Exception is expected");
} catch (IllegalArgumentException e) {
Assert.assertTrue("The throw exception is not expected",
e.getMessage().contains("Invalid container_type: UNSUPPORTED_TYPE"));
}
+
+ try {
+ String[] args = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_resources",
+ "memory-mb=invalid"
+ );
+ client.init(args);
+ Assert.fail("Exception is expected");
+ } catch (IllegalArgumentException e) {
+ // do nothing
+ LOG.info("IllegalArgumentException exception is expected: {}",
+ e.getMessage());
+ }
+
+ try {
+ String[] args = createArgsWithPostFix(appNameCounter++,
+ "--num_containers",
+ "1",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_resources"
+ );
+ client.init(args);
+ Assert.fail("Exception is expected");
+ } catch (MissingArgumentException e) {
+ // do nothing
+ LOG.info("MissingArgumentException exception is expected: {}",
+ e.getMessage());
+ }
}
@Test
@@ -1276,54 +1317,45 @@ public class TestDistributedShell {
}
protected void waitForNMsToRegister() throws Exception {
- int sec = 60;
- while (sec >= 0) {
- if (yarnCluster.getResourceManager().getRMContext().getRMNodes().size()
- >= NUM_NMS) {
- break;
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
+ return (rmContext.getRMNodes().size() >= NUM_NMS);
}
- Thread.sleep(1000);
- sec--;
- }
+ }, 100, 60000);
}
@Test
public void testContainerLaunchFailureHandling() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
- "--num_containers",
- "2",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_memory",
- "512",
- "--container_memory",
- "128"
- };
+ String[] args = createArguments(
+ "--num_containers",
+ "2",
+ "--shell_command",
+ Shell.WINDOWS ? "dir" : "ls",
+ "--master_memory",
+ "512",
+ "--container_memory",
+ "128"
+ );
LOG.info("Initializing DS Client");
Client client = new Client(ContainerLaunchFailAppMaster.class.getName(),
- new Configuration(yarnCluster.getConfig()));
+ new Configuration(yarnCluster.getConfig()));
boolean initSuccess = client.init(args);
Assert.assertTrue(initSuccess);
LOG.info("Running DS Client");
- boolean result = client.run();
-
- LOG.info("Client run completed. Result=" + result);
- Assert.assertFalse(result);
-
+ try {
+ boolean result = client.run();
+ Assert.assertFalse(result);
+ } finally {
+ client.sendStopSignal();
+ }
}
@Test
public void testDebugFlag() throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"2",
"--shell_command",
@@ -1337,7 +1369,7 @@ public class TestDistributedShell {
"--container_vcores",
"1",
"--debug"
- };
+ );
LOG.info("Initializing DS Client");
Client client = new Client(new Configuration(yarnCluster.getConfig()));
@@ -1370,7 +1402,7 @@ public class TestDistributedShell {
for (File output : containerFiles[i].listFiles()) {
if (output.getName().trim().contains("stdout")) {
BufferedReader br = null;
- List<String> stdOutContent = new ArrayList<String>();
+ List<String> stdOutContent = new ArrayList<>();
try {
String sCurrentLine;
@@ -1402,13 +1434,13 @@ public class TestDistributedShell {
Assert.assertTrue(stdOutContent.containsAll(expectedContent));
}
} catch (IOException e) {
- e.printStackTrace();
+ LOG.error("Exception reading the buffer", e);
} finally {
try {
if (br != null)
br.close();
} catch (IOException ex) {
- ex.printStackTrace();
+ LOG.error("Exception closing the bufferReader", ex);
}
}
}
@@ -1419,21 +1451,21 @@ public class TestDistributedShell {
@Test
public void testDistributedShellResourceProfiles() throws Exception {
- String appName = generateAppName();
+ int appNameCounter = 0;
String[][] args = {
- {"--appname", appName + "-0", "--jar", APPMASTER_JAR,
+ createArgsWithPostFix(appNameCounter++,
"--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile",
- "maximum" },
- {"--appname", appName + "-1", "--jar", APPMASTER_JAR,
+ "maximum"),
+ createArgsWithPostFix(appNameCounter++,
"--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
- "default" },
- {"--appname", appName + "-2", "--jar", APPMASTER_JAR,
+ "default"),
+ createArgsWithPostFix(appNameCounter++,
"--num_containers", "1", "--shell_command",
Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile",
- "default", "--container_resource_profile", "maximum" }
- };
+ "default", "--container_resource_profile", "maximum"),
+ };
for (int i = 0; i < args.length; ++i) {
LOG.info("Initializing DS Client");
@@ -1453,11 +1485,7 @@ public class TestDistributedShell {
public void testDSShellWithOpportunisticContainers() throws Exception {
Client client = new Client(new Configuration(yarnCluster.getConfig()));
try {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"2",
"--master_memory",
@@ -1472,24 +1500,24 @@ public class TestDistributedShell {
"date",
"--container_type",
"OPPORTUNISTIC"
- };
+ );
client.init(args);
- client.run();
+ assertTrue(client.run());
} catch (Exception e) {
- Assert.fail("Job execution with opportunistic containers failed.");
+ LOG.error("Job execution with opportunistic containers failed.", e);
+ Assert.fail("Exception. " + e.getMessage());
+ } finally {
+ client.sendStopSignal();
}
}
@Test
@TimelineVersion(2.0f)
public void testDSShellWithEnforceExecutionType() throws Exception {
+ YarnClient yarnClient = null;
Client client = new Client(new Configuration(yarnCluster.getConfig()));
try {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"2",
"--master_memory",
@@ -1505,7 +1533,7 @@ public class TestDistributedShell {
"--container_type",
"OPPORTUNISTIC",
"--enforce_execution_type"
- };
+ );
client.init(args);
final AtomicBoolean result = new AtomicBoolean(false);
Thread t = new Thread() {
@@ -1519,7 +1547,7 @@ public class TestDistributedShell {
};
t.start();
- YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient = YarnClient.createYarnClient();
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
waitForContainersLaunch(yarnClient, 2);
@@ -1543,7 +1571,13 @@ public class TestDistributedShell {
}
}
} catch (Exception e) {
- Assert.fail("Job execution with enforce execution type failed.");
+ LOG.error("Job execution with enforce execution type failed.", e);
+ Assert.fail("Exception. " + e.getMessage());
+ } finally {
+ client.sendStopSignal();
+ if (yarnClient != null) {
+ yarnClient.stop();
+ }
}
}
@@ -1592,26 +1626,22 @@ public class TestDistributedShell {
.getResourceScheduler().getClusterResource();
String masterMemoryString = "1 Gi";
String containerMemoryString = "512 Mi";
- long masterMemory = 1024;
- long containerMemory = 512;
+ long[] memVars = {1024, 512};
+
Assume.assumeTrue("The cluster doesn't have enough memory for this test",
- clusterResource.getMemorySize() >= masterMemory + containerMemory);
+ clusterResource.getMemorySize() >= memVars[0] + memVars[1]);
Assume.assumeTrue("The cluster doesn't have enough cores for this test",
clusterResource.getVirtualCores() >= 2);
if (largeContainers) {
- masterMemory = clusterResource.getMemorySize() * 2 / 3;
- masterMemory = masterMemory - masterMemory % MIN_ALLOCATION_MB;
- masterMemoryString = masterMemory + "Mi";
- containerMemory = clusterResource.getMemorySize() / 3;
- containerMemory = containerMemory - containerMemory % MIN_ALLOCATION_MB;
- containerMemoryString = String.valueOf(containerMemory);
+ memVars[0] = clusterResource.getMemorySize() * 2 / 3;
+ memVars[0] = memVars[0] - memVars[0] % MIN_ALLOCATION_MB;
+ masterMemoryString = memVars[0] + "Mi";
+ memVars[1] = clusterResource.getMemorySize() / 3;
+ memVars[1] = memVars[1] - memVars[1] % MIN_ALLOCATION_MB;
+ containerMemoryString = String.valueOf(memVars[1]);
}
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"2",
"--shell_command",
@@ -1619,8 +1649,8 @@ public class TestDistributedShell {
"--master_resources",
"memory=" + masterMemoryString + ",vcores=1",
"--container_resources",
- "memory=" + containerMemoryString + ",vcores=1",
- };
+ "memory=" + containerMemoryString + ",vcores=1"
+ );
LOG.info("Initializing DS Client");
Client client = new Client(new Configuration(yarnCluster.getConfig()));
@@ -1642,103 +1672,81 @@ public class TestDistributedShell {
yarnClient.init(new Configuration(yarnCluster.getConfig()));
yarnClient.start();
- while (true) {
- List<ApplicationReport> apps = yarnClient.getApplications();
- if (apps.isEmpty()) {
- Thread.sleep(10);
- continue;
- }
- ApplicationReport appReport = apps.get(0);
- ApplicationId appId = appReport.getApplicationId();
- List<ApplicationAttemptReport> appAttempts =
- yarnClient.getApplicationAttempts(appId);
- if (appAttempts.isEmpty()) {
- Thread.sleep(10);
- continue;
- }
- ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
- ContainerId amContainerId = appAttemptReport.getAMContainerId();
-
- if (amContainerId == null) {
- Thread.sleep(10);
- continue;
- }
- ContainerReport report = yarnClient.getContainerReport(amContainerId);
- Resource masterResource = report.getAllocatedResource();
- Assert.assertEquals(masterMemory, masterResource.getMemorySize());
- Assert.assertEquals(1, masterResource.getVirtualCores());
-
- List<ContainerReport> containers =
- yarnClient.getContainers(appAttemptReport.getApplicationAttemptId());
- if (containers.size() < 2) {
- Thread.sleep(10);
- continue;
- }
- for (ContainerReport container : containers) {
- if (!container.getContainerId().equals(amContainerId)) {
- Resource containerResource = container.getAllocatedResource();
- Assert.assertEquals(containerMemory,
- containerResource.getMemorySize());
- Assert.assertEquals(1, containerResource.getVirtualCores());
+ final AtomicBoolean testFailed = new AtomicBoolean(false);
+ try {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ if (testFailed.get()) {
+ return true;
+ }
+ List<ContainerReport> containers;
+ try {
+ List<ApplicationReport> apps = yarnClient.getApplications();
+ if (apps.isEmpty()) {
+ return false;
+ }
+ ApplicationReport appReport = apps.get(0);
+ ApplicationId appId = appReport.getApplicationId();
+ List<ApplicationAttemptReport> appAttempts =
+ yarnClient.getApplicationAttempts(appId);
+ if (appAttempts.isEmpty()) {
+ return false;
+ }
+ ApplicationAttemptReport appAttemptReport = appAttempts.get(0);
+ ContainerId amContainerId = appAttemptReport.getAMContainerId();
+ if (amContainerId == null) {
+ return false;
+ }
+ ContainerReport report = yarnClient.getContainerReport(
+ amContainerId);
+ Resource masterResource = report.getAllocatedResource();
+ Assert.assertEquals(memVars[0],
+ masterResource.getMemorySize());
+ Assert.assertEquals(1, masterResource.getVirtualCores());
+ containers = yarnClient.getContainers(
+ appAttemptReport.getApplicationAttemptId());
+ if (containers.size() < 2) {
+ return false;
+ }
+ for (ContainerReport container : containers) {
+ if (!container.getContainerId().equals(amContainerId)) {
+ Resource containerResource = container.getAllocatedResource();
+ Assert.assertEquals(memVars[1],
+ containerResource.getMemorySize());
+ Assert.assertEquals(1, containerResource.getVirtualCores());
+ }
+ }
+ return true;
+ } catch (Exception ex) {
+ LOG.error("Error waiting for expected results", ex);
+ testFailed.set(true);
+ }
+ return false;
}
+ }, 10, TEST_TIME_WINDOW_EXPIRE);
+ assertFalse(testFailed.get());
+ } finally {
+ LOG.info("Signaling Client to Stop");
+ client.sendStopSignal();
+ if (yarnClient != null) {
+ LOG.info("Stopping yarnClient service");
+ yarnClient.stop();
}
-
- return;
}
}
- @Test(expected=IllegalArgumentException.class)
- public void testDistributedShellAMResourcesWithIllegalArguments()
- throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_resources",
- "memory-mb=invalid"
- };
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- }
-
- @Test(expected=MissingArgumentException.class)
- public void testDistributedShellAMResourcesWithMissingArgumentValue()
- throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
- "--num_containers",
- "1",
- "--shell_command",
- Shell.WINDOWS ? "dir" : "ls",
- "--master_resources"
- };
- Client client = new Client(new Configuration(yarnCluster.getConfig()));
- client.init(args);
- }
-
@Test(expected=ResourceNotFoundException.class)
public void testDistributedShellAMResourcesWithUnknownResource()
throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--master_resources",
"unknown-resource=5"
- };
+ );
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
client.run();
@@ -1747,18 +1755,14 @@ public class TestDistributedShell {
@Test(expected=IllegalArgumentException.class)
public void testDistributedShellNonExistentQueue()
throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--queue",
"non-existent-queue"
- };
+ );
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
client.run();
@@ -1767,11 +1771,7 @@ public class TestDistributedShell {
@Test
public void testDistributedShellWithSingleFileLocalization()
throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
@@ -1780,7 +1780,7 @@ public class TestDistributedShell {
"./src/test/resources/a.txt",
"--shell_args",
"a.txt"
- };
+ );
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
@@ -1790,11 +1790,7 @@ public class TestDistributedShell {
@Test
public void testDistributedShellWithMultiFileLocalization()
throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
@@ -1803,7 +1799,7 @@ public class TestDistributedShell {
"./src/test/resources/a.txt,./src/test/resources/b.txt",
"--shell_args",
"a.txt b.txt"
- };
+ );
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
@@ -1813,11 +1809,7 @@ public class TestDistributedShell {
@Test(expected=UncheckedIOException.class)
public void testDistributedShellWithNonExistentFileLocalization()
throws Exception {
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
@@ -1826,11 +1818,11 @@ public class TestDistributedShell {
"/non/existing/path/file.txt",
"--shell_args",
"file.txt"
- };
+ );
Client client = new Client(new Configuration(yarnCluster.getConfig()));
client.init(args);
- client.run();
+ assertTrue(client.run());
}
@@ -1838,34 +1830,34 @@ public class TestDistributedShell {
public void testDistributedShellCleanup()
throws Exception {
String appName = "DistributedShellCleanup";
- String[] args = {
- "--appname",
- generateAppName(),
- "--jar",
- APPMASTER_JAR,
+ String[] args = createArguments(
"--num_containers",
"1",
"--shell_command",
Shell.WINDOWS ? "dir" : "ls"
- };
+ );
Configuration config = new Configuration(yarnCluster.getConfig());
Client client = new Client(config);
- client.init(args);
- client.run();
- ApplicationId appId = client.getAppId();
- String relativePath =
- ApplicationMaster.getRelativePath(appName, appId.toString(), "");
- FileSystem fs1 = FileSystem.get(config);
- Path path = new Path(fs1.getHomeDirectory(), relativePath);
+ try {
+ client.init(args);
+ client.run();
+ ApplicationId appId = client.getAppId();
+ String relativePath =
+ ApplicationMaster.getRelativePath(appName, appId.toString(), "");
+ FileSystem fs1 = FileSystem.get(config);
+ Path path = new Path(fs1.getHomeDirectory(), relativePath);
- GenericTestUtils.waitFor(() -> {
- try {
- return !fs1.exists(path);
- } catch (IOException e) {
- return false;
- }
- }, 10, 60000);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return !fs1.exists(path);
+ } catch (IOException e) {
+ return false;
+ }
+ }, 10, 60000);
- assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
+ assertFalse("Distributed Shell Cleanup failed", fs1.exists(path));
+ } finally {
+ client.sendStopSignal();
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org