You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/19 10:03:46 UTC

[GitHub] TisonKun closed pull request #6668: [hotfix] Clear YarnTestBase

TisonKun closed pull request #6668: [hotfix] Clear YarnTestBase
URL: https://github.com/apache/flink/pull/6668
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9a8f5033f3f..95c66bc621f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -112,9 +112,9 @@ public void testMultipleAMKill() throws Exception {
 		final Configuration configuration = GlobalConfiguration.loadConfiguration();
 		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
 			configuration,
-			getYarnConfiguration(),
+			YARN_CONFIGURATION,
 			confDirPath,
-			getYarnClient(),
+			yarnClient,
 			true);
 
 		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 758a09866d0..fae5bd69e7f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -30,7 +30,6 @@
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -55,11 +54,10 @@ public static void setup() {
 	public void testPerJobMode() throws Exception {
 		Configuration configuration = new Configuration();
 		configuration.setString(AkkaOptions.ASK_TIMEOUT, "30 s");
-		final YarnClient yarnClient = getYarnClient();
 
 		try (final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
 			configuration,
-			getYarnConfiguration(),
+			YARN_CONFIGURATION,
 			System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR),
 			yarnClient,
 			true)) {
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 1009fbbf529..b5b59132c35 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -447,7 +447,7 @@ private void testDetachedPerJobYarnClusterInternal(String job) throws Exception
 		// get temporary folder for writing output of wordcount example
 		File tmpOutFolder = null;
 		try {
-			tmpOutFolder = tmp.newFolder();
+			tmpOutFolder = TEMPORARY_FOLDER.newFolder();
 		}
 		catch (IOException e) {
 			throw new RuntimeException(e);
@@ -456,7 +456,7 @@ private void testDetachedPerJobYarnClusterInternal(String job) throws Exception
 		// get temporary file for reading input data for wordcount example
 		File tmpInFile;
 		try {
-			tmpInFile = tmp.newFile();
+			tmpInFile = TEMPORARY_FOLDER.newFile();
 			FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
 		}
 		catch (IOException e) {
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index f027399be7f..00dbf9ca1c0 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -94,7 +94,7 @@ public void testDetachedMode() throws InterruptedException, IOException {
 
 		File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
 		// get temporary file for reading input data for wordcount example
-		File tmpInFile = tmp.newFile();
+		File tmpInFile = TEMPORARY_FOLDER.newFile();
 		FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
 
 		ArrayList<String> args = new ArrayList<>();
@@ -308,9 +308,9 @@ public void testJavaAPI() throws Exception {
 
 		try (final AbstractYarnClusterDescriptor clusterDescriptor = new LegacyYarnClusterDescriptor(
 			configuration,
-			getYarnConfiguration(),
+			YARN_CONFIGURATION,
 			confDirPath,
-			getYarnClient(),
+			yarnClient,
 			true)) {
 			Assert.assertNotNull("unable to get yarn client", clusterDescriptor);
 			clusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 97f60fce807..4add42e8bb8 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -26,6 +26,7 @@
 import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.test.util.TestingSecurityContext;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
@@ -46,7 +47,7 @@
  */
 public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 
-	protected static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
+	private static final Logger LOG = LoggerFactory.getLogger(YARNSessionFIFOSecuredITCase.class);
 
 	@BeforeClass
 	public static void setup() {
@@ -58,10 +59,27 @@ public static void setup() {
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
 		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-fifo-secured");
 
-		SecureTestEnvironment.prepare(tmp);
+		SecureTestEnvironment.prepare(TEMPORARY_FOLDER);
 
-		populateYarnSecureConfigurations(YARN_CONFIGURATION, SecureTestEnvironment.getHadoopServicePrincipal(),
-				SecureTestEnvironment.getTestKeytab());
+		final String principal = SecureTestEnvironment.getHadoopServicePrincipal();
+		final String keytab = SecureTestEnvironment.getTestKeytab();
+
+		// Populate Yarn secure configurations.
+
+		YARN_CONFIGURATION.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+		YARN_CONFIGURATION.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+
+		YARN_CONFIGURATION.set(YarnConfiguration.RM_KEYTAB, keytab);
+		YARN_CONFIGURATION.set(YarnConfiguration.RM_PRINCIPAL, principal);
+		YARN_CONFIGURATION.set(YarnConfiguration.NM_KEYTAB, keytab);
+		YARN_CONFIGURATION.set(YarnConfiguration.NM_PRINCIPAL, principal);
+
+		YARN_CONFIGURATION.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+		YARN_CONFIGURATION.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
+		YARN_CONFIGURATION.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
+		YARN_CONFIGURATION.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
+
+		YARN_CONFIGURATION.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
 
 		Configuration flinkConfig = new Configuration();
 		flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB,
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
index 58710bc6962..69153d56362 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnConfigurationITCase.java
@@ -43,8 +43,6 @@
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -78,7 +76,6 @@
 	 */
 	@Test(timeout = 60000)
 	public void testFlinkContainerMemory() throws Exception {
-		final YarnClient yarnClient = getYarnClient();
 		final Configuration configuration = new Configuration(flinkConfiguration);
 
 		final int masterMemory = 64;
@@ -90,10 +87,9 @@ public void testFlinkContainerMemory() throws Exception {
 		configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MIN, String.valueOf(1L << 20));
 		configuration.setString(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, String.valueOf(4L << 20));
 
-		final YarnConfiguration yarnConfiguration = getYarnConfiguration();
 		final YarnClusterDescriptor clusterDescriptor = new YarnClusterDescriptor(
 			configuration,
-			yarnConfiguration,
+			YARN_CONFIGURATION,
 			CliFrontend.getConfigurationDirectoryFromEnv(),
 			yarnClient,
 			true);
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f68fe..2b002d2fc78 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -31,7 +31,6 @@
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -56,8 +55,6 @@
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
-import javax.annotation.Nullable;
-
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
@@ -126,7 +123,7 @@
 
 	// Temp directory which is deleted after the unit test.
 	@ClassRule
-	public static TemporaryFolder tmp = new TemporaryFolder();
+	public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
 
 	protected static MiniYARNCluster yarnCluster = null;
 
@@ -147,7 +144,7 @@
 	 */
 	protected static File tempConfPathForSecureRun = null;
 
-	private YarnClient yarnClient = null;
+	protected YarnClient yarnClient = null;
 
 	private static org.apache.flink.configuration.Configuration globalConfiguration;
 
@@ -171,29 +168,128 @@
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
 	}
 
-	public static void populateYarnSecureConfigurations(Configuration conf, String principal, String keytab) {
+	private static boolean isApplicationRunning(ApplicationReport app) {
+		final YarnApplicationState yarnApplicationState = app.getYarnApplicationState();
+		return yarnApplicationState != YarnApplicationState.FINISHED
+			&& app.getYarnApplicationState() != YarnApplicationState.KILLED
+			&& app.getYarnApplicationState() != YarnApplicationState.FAILED;
+	}
+
+	public static int getRunningContainers() {
+		int count = 0;
+		for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
+			NodeManager nm = yarnCluster.getNodeManager(nmId);
+			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
+			count += containers.size();
+		}
+		return count;
+	}
+
+	/**
+	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
+	 */
+	@BeforeClass
+	public static void setup() {
+		startYARNWithConfig(YARN_CONFIGURATION);
+	}
+
+	public static void startYARNWithConfig(YarnConfiguration conf) {
+		start(conf, null, null);
+	}
+
+	public static void startYARNSecureMode(YarnConfiguration conf, String principal, String keytab) {
+		start(conf, principal, keytab);
+	}
+
+	private static void start(YarnConfiguration conf, String principal, String keytab) {
+		// set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file
+		File homeDir = null;
+		try {
+			homeDir = TEMPORARY_FOLDER.newFolder();
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+		System.setProperty("user.home", homeDir.getAbsolutePath());
+		String uberjarStartLoc = "..";
+		LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc));
+		flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter());
+		Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
+		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
+		flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
+		Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
+		Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
+		Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
+
+		if (!flinkUberjar.exists()) {
+			Assert.fail("Unable to locate yarn-uberjar.jar");
+		}
+
+		try {
+			LOG.info("Starting up MiniYARNCluster");
+			if (yarnCluster == null) {
+				final String testName = conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY);
+				yarnCluster = new MiniYARNCluster(
+					testName == null ? "YarnTest_" + UUID.randomUUID() : testName,
+					NUM_NODEMANAGERS,
+					1,
+					1);
+
+				yarnCluster.init(conf);
+				yarnCluster.start();
+			}
+
+			Map<String, String> map = new HashMap<>(System.getenv());
+
+			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
+			Assert.assertNotNull(flinkConfDirPath);
+
+			final String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath();
+			globalConfiguration = GlobalConfiguration.loadConfiguration(confDirPath);
+
+			// copy conf dir to test temporary workspace location
+			tempConfPathForSecureRun = TEMPORARY_FOLDER.newFolder("conf");
+
+			FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
+
+			globalConfiguration.setString(CoreOptions.MODE,
+				Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
+
+			BootstrapTools.writeConfiguration(
+				globalConfiguration,
+				new File(tempConfPathForSecureRun, "flink-conf.yaml"));
+
+			String configDir = tempConfPathForSecureRun.getAbsolutePath();
+
+			LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
+
+			Assert.assertNotNull(configDir);
 
-		conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-		conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
+			map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
 
-		conf.set(YarnConfiguration.RM_KEYTAB, keytab);
-		conf.set(YarnConfiguration.RM_PRINCIPAL, principal);
-		conf.set(YarnConfiguration.NM_KEYTAB, keytab);
-		conf.set(YarnConfiguration.NM_PRINCIPAL, principal);
+			File yarnConfFile = writeYarnSiteConfigXML(conf);
+			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
+			map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos
+			TestBaseUtils.setEnv(map);
 
-		conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
-		conf.set(YarnConfiguration.RM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
-		conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY, principal);
-		conf.set(YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY, keytab);
+			Assert.assertSame(Service.STATE.STARTED, yarnCluster.getServiceState());
 
-		conf.set("hadoop.security.auth_to_local", "RULE:[1:$1] RULE:[2:$1]");
+			// wait for the nodeManagers to connect
+			while (!yarnCluster.waitForNodeManagersToConnect(500)) {
+				LOG.info("Waiting for Nodemanagers to connect");
+			}
+		} catch (Exception ex) {
+			ex.printStackTrace();
+			LOG.error("setup failure", ex);
+			Assert.fail();
+		}
 	}
 
 	@Before
 	public void checkClusterEmpty() {
 		if (yarnClient == null) {
 			yarnClient = YarnClient.createYarnClient();
-			yarnClient.init(getYarnConfiguration());
+			yarnClient.init(YARN_CONFIGURATION);
 			yarnClient.start();
 		}
 
@@ -202,9 +298,7 @@ public void checkClusterEmpty() {
 		isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType());
 	}
 
-	/**
-	 * Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
-	 */
+	// Sleep a bit between the tests (we are re-using the YARN cluster for the tests).
 	@After
 	public void sleep() throws IOException, YarnException {
 		Deadline deadline = Deadline.now().plus(Duration.ofSeconds(10));
@@ -233,373 +327,134 @@ public void sleep() throws IOException, YarnException {
 		}
 	}
 
-	private static boolean isApplicationRunning(ApplicationReport app) {
-		final YarnApplicationState yarnApplicationState = app.getYarnApplicationState();
-		return yarnApplicationState != YarnApplicationState.FINISHED
-			&& app.getYarnApplicationState() != YarnApplicationState.KILLED
-			&& app.getYarnApplicationState() != YarnApplicationState.FAILED;
-	}
+	@AfterClass
+	public static void teardown() throws Exception {
 
-	@Nullable
-	protected YarnClient getYarnClient() {
-		return yarnClient;
-	}
+		LOG.info("Stopping MiniYarn Cluster");
+		yarnCluster.stop();
 
-	protected static YarnConfiguration getYarnConfiguration() {
-		return YARN_CONFIGURATION;
-	}
+		// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
+		Map<String, String> map = new HashMap<>(System.getenv());
+		map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
+		map.remove("YARN_CONF_DIR");
+		map.remove("IN_TESTS");
+		TestBaseUtils.setEnv(map);
 
-	/**
-	 * Locate a file or directory.
-	 */
-	public static File findFile(String startAt, FilenameFilter fnf) {
-		File root = new File(startAt);
-		String[] files = root.list();
-		if (files == null) {
-			return null;
+		if (tempConfPathForSecureRun != null) {
+			FileUtil.fullyDelete(tempConfPathForSecureRun);
+			tempConfPathForSecureRun = null;
 		}
-		for (String file : files) {
-			File f = new File(startAt + File.separator + file);
-			if (f.isDirectory()) {
-				File r = findFile(f.getAbsolutePath(), fnf);
-				if (r != null) {
-					return r;
-				}
-			} else if (fnf.accept(f.getParentFile(), f.getName())) {
-				return f;
+
+		// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
+		// to <flinkRoot>/target/flink-yarn-tests-*.
+		// The files from there are picked up by the ./tools/travis_watchdog.sh script
+		// to upload them to Amazon S3.
+		if (isOnTravis()) {
+			File target = new File("../target" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+			if (!target.mkdirs()) {
+				LOG.warn("Error creating dirs to {}", target);
+			}
+			File src = TEMPORARY_FOLDER.getRoot();
+			LOG.info("copying the final files from {} to {}", src.getAbsolutePath(), target.getAbsolutePath());
+			try {
+				FileUtils.copyDirectoryToDirectory(src, target);
+			} catch (IOException e) {
+				LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
 			}
 		}
-		return null;
+
 	}
 
+	// -------------------------- Runner -------------------------- //
+
+	protected static ByteArrayOutputStream outContent;
+	protected static ByteArrayOutputStream errContent;
+
 	/**
-	 * Filter to find root dir of the flink-yarn dist.
+	 * Enumeration of RunType.
 	 */
-	public static class RootDirFilenameFilter implements FilenameFilter {
-		@Override
-		public boolean accept(File dir, String name) {
-			return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib");
-		}
+	protected enum RunTypes {
+		YARN_SESSION, CLI_FRONTEND
 	}
 
 	/**
-	 * A simple {@link FilenameFilter} that only accepts files if their name contains every string in the array passed
-	 * to the constructor.
+	 * Utility class to run yarn jobs.
 	 */
-	public static class ContainsName implements FilenameFilter {
-		private String[] names;
-		private String excludeInPath = null;
+	protected static class Runner extends Thread {
+		private final String[] args;
+		private final org.apache.flink.configuration.Configuration configuration;
+		private final String configurationDirectory;
+		private final int expectedReturnValue;
 
-		/**
-		 * @param names which have to be included in the filename.
-		 */
-		public ContainsName(String[] names) {
-			this.names = names;
-		}
+		private final PrintStream stdinPrintStream;
 
-		public ContainsName(String[] names, String excludeInPath) {
-			this.names = names;
-			this.excludeInPath = excludeInPath;
+		private RunTypes type;
+		private FlinkYarnSessionCli yarnSessionCli;
+		private Throwable runnerError;
+
+		public Runner(
+			String[] args,
+			org.apache.flink.configuration.Configuration configuration,
+			String configurationDirectory,
+			RunTypes type,
+			int expectedReturnValue,
+			PrintStream stdinPrintStream) {
+
+			this.args = args;
+			this.configuration = Preconditions.checkNotNull(configuration);
+			this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
+			this.type = type;
+			this.expectedReturnValue = expectedReturnValue;
+			this.stdinPrintStream = Preconditions.checkNotNull(stdinPrintStream);
 		}
 
 		@Override
-		public boolean accept(File dir, String name) {
-			if (excludeInPath == null) {
-				for (String n: names) {
-					if (!name.contains(n)) {
-						return false;
-					}
+		public void run() {
+			try {
+				int returnValue;
+				switch (type) {
+					case YARN_SESSION:
+						yarnSessionCli = new FlinkYarnSessionCli(
+							configuration,
+							configurationDirectory,
+							"",
+							"",
+							true);
+						returnValue = yarnSessionCli.run(args);
+						break;
+					case CLI_FRONTEND:
+						try {
+							CliFrontend cli = new CliFrontend(
+								configuration,
+								CliFrontend.loadCustomCommandLines(configuration, configurationDirectory));
+							returnValue = cli.parseParameters(args);
+						} catch (Exception e) {
+							throw new RuntimeException("Failed to execute the following args with CliFrontend: "
+								+ Arrays.toString(args), e);
+						}
+						break;
+					default:
+						throw new RuntimeException("Unknown type " + type);
 				}
-				return true;
-			} else {
-				for (String n: names) {
-					if (!name.contains(n)) {
-						return false;
-					}
+
+				if (returnValue != this.expectedReturnValue) {
+					Assert.fail("The YARN session returned with unexpected value=" + returnValue + " expected=" + expectedReturnValue);
 				}
-				return !dir.toString().contains(excludeInPath);
+			} catch (Throwable t) {
+				LOG.info("Runner stopped with exception", t);
+				// save error.
+				this.runnerError = t;
 			}
 		}
-	}
-
-	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
-		tmp.create();
-		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
 
-		try (FileWriter writer = new FileWriter(yarnSiteXML)) {
-			yarnConf.writeXml(writer);
-			writer.flush();
-		}
-		return yarnSiteXML;
-	}
-
-	/**
-	 * This method checks the written TaskManager and JobManager log files
-	 * for exceptions.
-	 *
-	 * <p>WARN: Please make sure the tool doesn't find old logfiles from previous test runs.
-	 * So always run "mvn clean" before running the tests here.
-	 *
-	 */
-	public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
-		File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
-		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
-		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
-
-		List<String> prohibitedExcerpts = new ArrayList<>();
-		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
-			@Override
-			public boolean accept(File dir, String name) {
-			// scan each file for prohibited strings.
-			File f = new File(dir.getAbsolutePath() + "/" + name);
-			try {
-				BufferingScanner scanner = new BufferingScanner(new Scanner(f), 10);
-				while (scanner.hasNextLine()) {
-					final String lineFromFile = scanner.nextLine();
-					for (String aProhibited : prohibited) {
-						if (lineFromFile.contains(aProhibited)) {
-
-							boolean whitelistedFound = false;
-							for (String white : whitelisted) {
-								if (lineFromFile.contains(white)) {
-									whitelistedFound = true;
-									break;
-								}
-							}
-
-							if (!whitelistedFound) {
-								// logging in FATAL to see the actual message in TRAVIS tests.
-								Marker fatal = MarkerFactory.getMarker("FATAL");
-								LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
-
-								StringBuilder logExcerpt = new StringBuilder();
-
-								logExcerpt.append(System.lineSeparator());
-
-								// include some previous lines in case of irregular formatting
-								for (String previousLine : scanner.getPreviousLines()) {
-									logExcerpt.append(previousLine);
-									logExcerpt.append(System.lineSeparator());
-								}
-
-								logExcerpt.append(lineFromFile);
-								logExcerpt.append(System.lineSeparator());
-								// extract potential stack trace from log
-								while (scanner.hasNextLine()) {
-									String line = scanner.nextLine();
-									logExcerpt.append(line);
-									logExcerpt.append(System.lineSeparator());
-									if (line.isEmpty() || (!Character.isWhitespace(line.charAt(0)) && !line.startsWith("Caused by"))) {
-										// the cause has been printed, now add a few more lines in case of irregular formatting
-										for (int x = 0; x < 10 && scanner.hasNextLine(); x++) {
-											logExcerpt.append(scanner.nextLine());
-											logExcerpt.append(System.lineSeparator());
-										}
-										break;
-									}
-								}
-								prohibitedExcerpts.add(logExcerpt.toString());
-
-								return true;
-							}
-						}
-					}
-
-				}
-			} catch (FileNotFoundException e) {
-				LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath());
-			}
-
-			return false;
-			}
-		});
-		if (foundFile != null) {
-			Scanner scanner =  null;
-			try {
-				scanner = new Scanner(foundFile);
-			} catch (FileNotFoundException e) {
-				Assert.fail("Unable to locate file: " + e.getMessage() + " file: " + foundFile.getAbsolutePath());
-			}
-			LOG.warn("Found a file with a prohibited string. Printing contents:");
-			while (scanner.hasNextLine()) {
-				LOG.warn("LINE: " + scanner.nextLine());
-			}
-			Assert.fail(
-				"Found a file " + foundFile + " with a prohibited string (one of " + Arrays.toString(prohibited) + "). " +
-				"Excerpts:" + System.lineSeparator() + prohibitedExcerpts);
-		}
-	}
-
-	public static boolean verifyStringsInNamedLogFiles(
-			final String[] mustHave, final String fileName) {
-		List<String> mustHaveList = Arrays.asList(mustHave);
-		File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
-		if (!cwd.exists() || !cwd.isDirectory()) {
-			return false;
-		}
-
-		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
-			@Override
-			public boolean accept(File dir, String name) {
-				if (fileName != null && !name.equals(fileName)) {
-					return false;
-				}
-				File f = new File(dir.getAbsolutePath() + "/" + name);
-				LOG.info("Searching in {}", f.getAbsolutePath());
-				try {
-					Set<String> foundSet = new HashSet<>(mustHave.length);
-					Scanner scanner = new Scanner(f);
-					while (scanner.hasNextLine()) {
-						final String lineFromFile = scanner.nextLine();
-						for (String str : mustHave) {
-							if (lineFromFile.contains(str)) {
-								foundSet.add(str);
-							}
-						}
-						if (foundSet.containsAll(mustHaveList)) {
-							return true;
-						}
-					}
-				} catch (FileNotFoundException e) {
-					LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath());
-				}
-				return false;
-			}
-		});
-
-		if (foundFile != null) {
-			LOG.info("Found string {} in {}.", Arrays.toString(mustHave), foundFile.getAbsolutePath());
-			return true;
-		} else {
-			return false;
-		}
-	}
-
-	public static void sleep(int time) {
-		try {
-			Thread.sleep(time);
-		} catch (InterruptedException e) {
-			LOG.warn("Interruped", e);
-		}
-	}
-
-	public static int getRunningContainers() {
-		int count = 0;
-		for (int nmId = 0; nmId < NUM_NODEMANAGERS; nmId++) {
-			NodeManager nm = yarnCluster.getNodeManager(nmId);
-			ConcurrentMap<ContainerId, Container> containers = nm.getNMContext().getContainers();
-			count += containers.size();
-		}
-		return count;
-	}
-
-	public static void startYARNSecureMode(YarnConfiguration conf, String principal, String keytab) {
-		start(conf, principal, keytab);
-	}
-
-	public static void startYARNWithConfig(YarnConfiguration conf) {
-		start(conf, null, null);
-	}
-
-	private static void start(YarnConfiguration conf, String principal, String keytab) {
-		// set the home directory to a temp directory. Flink on YARN is using the home dir to distribute the file
-		File homeDir = null;
-		try {
-			homeDir = tmp.newFolder();
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-		System.setProperty("user.home", homeDir.getAbsolutePath());
-		String uberjarStartLoc = "..";
-		LOG.info("Trying to locate uberjar in {}", new File(uberjarStartLoc));
-		flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter());
-		Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
-		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
-		flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
-		Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
-		Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
-		Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
-
-		if (!flinkUberjar.exists()) {
-			Assert.fail("Unable to locate yarn-uberjar.jar");
+		/** Stops the Yarn session. */
+		public void sendStop() {
+			stdinPrintStream.println("stop");
 		}
 
-		try {
-			LOG.info("Starting up MiniYARNCluster");
-			if (yarnCluster == null) {
-				final String testName = conf.get(YarnTestBase.TEST_CLUSTER_NAME_KEY);
-				yarnCluster = new MiniYARNCluster(
-					testName == null ? "YarnTest_" + UUID.randomUUID() : testName,
-					NUM_NODEMANAGERS,
-					1,
-					1);
-
-				yarnCluster.init(conf);
-				yarnCluster.start();
-			}
-
-			Map<String, String> map = new HashMap<String, String>(System.getenv());
-
-			File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
-			Assert.assertNotNull(flinkConfDirPath);
-
-			final String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath();
-			globalConfiguration = GlobalConfiguration.loadConfiguration(confDirPath);
-
-			//copy conf dir to test temporary workspace location
-			tempConfPathForSecureRun = tmp.newFolder("conf");
-
-			FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
-
-			globalConfiguration.setString(CoreOptions.MODE,
-				Objects.equals(TestBaseUtils.CodebaseType.NEW, TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
-
-			BootstrapTools.writeConfiguration(
-				globalConfiguration,
-				new File(tempConfPathForSecureRun, "flink-conf.yaml"));
-
-			String configDir = tempConfPathForSecureRun.getAbsolutePath();
-
-			LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
-
-			Assert.assertNotNull(configDir);
-
-			map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
-
-			File yarnConfFile = writeYarnSiteConfigXML(conf);
-			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
-			map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos
-			TestBaseUtils.setEnv(map);
-
-			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
-
-			// wait for the nodeManagers to connect
-			while (!yarnCluster.waitForNodeManagersToConnect(500)) {
-				LOG.info("Waiting for Nodemanagers to connect");
-			}
-		} catch (Exception ex) {
-			ex.printStackTrace();
-			LOG.error("setup failure", ex);
-			Assert.fail();
+		public Throwable getRunnerError() {
+			return runnerError;
 		}
-
-	}
-
-	/**
-	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
-	 */
-	@BeforeClass
-	public static void setup() {
-		startYARNWithConfig(YARN_CONFIGURATION);
-	}
-
-	// -------------------------- Runner -------------------------- //
-
-	protected static ByteArrayOutputStream outContent;
-	protected static ByteArrayOutputStream errContent;
-	enum RunTypes {
-		YARN_SESSION, CLI_FRONTEND
 	}
 
 	/**
@@ -630,7 +485,7 @@ protected Runner startWithArgs(String[] args, String startedAfterString, RunType
 		runner.setName("Frontend (CLI/YARN Client) runner thread (startWithArgs()).");
 		runner.start();
 
-		for (int second = 0; second <  startTimeoutSeconds; second++) {
+		for (int second = 0; second < startTimeoutSeconds; second++) {
 			sleep(1000);
 			// check output for correct TaskManager startup.
 			if (outContent.toString().contains(startedAfterString)
@@ -740,7 +595,7 @@ protected void runWithArgs(String[] args, String terminateAfterString, String[]
 				catch (InterruptedException e) {
 					LOG.warn("Interrupted while stopping runner", e);
 				}
-				LOG.warn("RunWithArgs runner stopped.");
+				LOG.info("RunWithArgs runner stopped.");
 			}
 			else {
 				// check if thread died
@@ -749,8 +604,7 @@ protected void runWithArgs(String[] args, String terminateAfterString, String[]
 					break;
 				}
 			}
-		}
-		while (runner.getRunnerError() == null && !expectedStringSeen && System.currentTimeMillis() < deadline);
+		} while (runner.getRunnerError() == null && !expectedStringSeen && System.currentTimeMillis() < deadline);
 
 		resetStreamsAndSendOutput();
 
@@ -773,128 +627,187 @@ protected static void resetStreamsAndSendOutput() {
 		LOG.info("Sending stderr content through logger: \n\n{}\n\n", errContent.toString());
 	}
 
+	// --------------------------- Utility Functions -------------------------
+
 	/**
-	 * Utility class to run yarn jobs.
+	 * Locate a file or directory.
 	 */
-	protected static class Runner extends Thread {
-		private final String[] args;
-		private final org.apache.flink.configuration.Configuration configuration;
-		private final String configurationDirectory;
-		private final int expectedReturnValue;
+	public static File findFile(String startAt, FilenameFilter fnf) {
+		File root = new File(startAt);
+		String[] files = root.list();
+		if (files == null) {
+			return null;
+		}
+		for (String file : files) {
+			File f = new File(startAt + File.separator + file);
+			if (f.isDirectory()) {
+				File r = findFile(f.getAbsolutePath(), fnf);
+				if (r != null) {
+					return r;
+				}
+			} else if (fnf.accept(f.getParentFile(), f.getName())) {
+				return f;
+			}
+		}
+		return null;
+	}
 
-		private final PrintStream stdinPrintStream;
+	/**
+	 * Filter to find root dir of the flink-yarn dist.
+	 */
+	public static class RootDirFilenameFilter implements FilenameFilter {
+		@Override
+		public boolean accept(File dir, String name) {
+			return name.startsWith("flink-dist") && name.endsWith(".jar") && dir.toString().contains("/lib");
+		}
+	}
 
-		private RunTypes type;
-		private FlinkYarnSessionCli yCli;
-		private Throwable runnerError;
+	/**
+	 * A simple {@link FilenameFilter} that only accepts files if their name contains every string in the array passed
+	 * to the constructor.
+	 */
+	public static class ContainsName implements FilenameFilter {
+		private String[] names;
+		private String excludeInPath = null;
 
-		public Runner(
-				String[] args,
-				org.apache.flink.configuration.Configuration configuration,
-				String configurationDirectory,
-				RunTypes type,
-				int expectedReturnValue,
-				PrintStream stdinPrintStream) {
+		/**
+		 * @param names which have to be included in the filename.
+		 */
+		public ContainsName(String[] names) {
+			this.names = names;
+		}
 
-			this.args = args;
-			this.configuration = Preconditions.checkNotNull(configuration);
-			this.configurationDirectory = Preconditions.checkNotNull(configurationDirectory);
-			this.type = type;
-			this.expectedReturnValue = expectedReturnValue;
-			this.stdinPrintStream = Preconditions.checkNotNull(stdinPrintStream);
+		public ContainsName(String[] names, String excludeInPath) {
+			this.names = names;
+			this.excludeInPath = excludeInPath;
 		}
 
 		@Override
-		public void run() {
-			try {
-				int returnValue;
-				switch (type) {
-					case YARN_SESSION:
-						yCli = new FlinkYarnSessionCli(
-							configuration,
-							configurationDirectory,
-							"",
-							"",
-							true);
-						returnValue = yCli.run(args);
-						break;
-					case CLI_FRONTEND:
-						try {
-							CliFrontend cli = new CliFrontend(
-								configuration,
-								CliFrontend.loadCustomCommandLines(configuration, configurationDirectory));
-							returnValue = cli.parseParameters(args);
-						} catch (Exception e) {
-							throw new RuntimeException("Failed to execute the following args with CliFrontend: "
-								+ Arrays.toString(args), e);
-						}
-						break;
-					default:
-						throw new RuntimeException("Unknown type " + type);
+		public boolean accept(File dir, String name) {
+			if (excludeInPath == null) {
+				for (String n: names) {
+					if (!name.contains(n)) {
+						return false;
+					}
 				}
-
-				if (returnValue != this.expectedReturnValue) {
-					Assert.fail("The YARN session returned with unexpected value=" + returnValue + " expected=" + expectedReturnValue);
+				return true;
+			} else {
+				for (String n: names) {
+					if (!name.contains(n)) {
+						return false;
+					}
 				}
-			} catch (Throwable t) {
-				LOG.info("Runner stopped with exception", t);
-				// save error.
-				this.runnerError = t;
+				return !dir.toString().contains(excludeInPath);
 			}
 		}
+	}
 
-		/** Stops the Yarn session. */
-		public void sendStop() {
-			stdinPrintStream.println("stop");
-		}
+	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
+		TEMPORARY_FOLDER.create();
+		File yarnSiteXML = new File(TEMPORARY_FOLDER.newFolder().getAbsolutePath() + "/yarn-site.xml");
 
-		public Throwable getRunnerError() {
-			return runnerError;
+		try (FileWriter writer = new FileWriter(yarnSiteXML)) {
+			yarnConf.writeXml(writer);
+			writer.flush();
 		}
+		return yarnSiteXML;
 	}
 
-	// -------------------------- Tear down -------------------------- //
+	/**
+	 * This method checks the written TaskManager and JobManager log files
+	 * for exceptions.
+	 *
+	 * <p>WARN: Please make sure the tool doesn't find old logfiles from previous test runs.
+	 * So always run "mvn clean" before running the tests here.
+	 *
+	 */
+	public static void ensureNoProhibitedStringInLogFiles(final String[] prohibited, final String[] whitelisted) {
+		File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to exist", cwd.exists());
+		Assert.assertTrue("Expecting directory " + cwd.getAbsolutePath() + " to be a directory", cwd.isDirectory());
 
-	@AfterClass
-	public static void teardown() throws Exception {
+		List<String> prohibitedExcerpts = new ArrayList<>();
+		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+			@Override
+			public boolean accept(File dir, String name) {
+				// scan each file for prohibited strings.
+				File f = new File(dir.getAbsolutePath() + "/" + name);
+				try {
+					BufferingScanner scanner = new BufferingScanner(new Scanner(f), 10);
+					while (scanner.hasNextLine()) {
+						final String lineFromFile = scanner.nextLine();
+						for (String aProhibited : prohibited) {
+							if (lineFromFile.contains(aProhibited)) {
 
-		LOG.info("Stopping MiniYarn Cluster");
-		yarnCluster.stop();
+								boolean whitelistedFound = false;
+								for (String white : whitelisted) {
+									if (lineFromFile.contains(white)) {
+										whitelistedFound = true;
+										break;
+									}
+								}
 
-		// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
-		Map<String, String> map = new HashMap<>(System.getenv());
-		map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
-		map.remove("YARN_CONF_DIR");
-		map.remove("IN_TESTS");
-		TestBaseUtils.setEnv(map);
+								if (!whitelistedFound) {
+									// logging in FATAL to see the actual message in TRAVIS tests.
+									Marker fatal = MarkerFactory.getMarker("FATAL");
+									LOG.error(fatal, "Prohibited String '{}' in line '{}'", aProhibited, lineFromFile);
 
-		if (tempConfPathForSecureRun != null) {
-			FileUtil.fullyDelete(tempConfPathForSecureRun);
-			tempConfPathForSecureRun = null;
-		}
+									StringBuilder logExcerpt = new StringBuilder();
 
-		// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
-		// to <flinkRoot>/target/flink-yarn-tests-*.
-		// The files from there are picked up by the ./tools/travis_watchdog.sh script
-		// to upload them to Amazon S3.
-		if (isOnTravis()) {
-			File target = new File("../target" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
-			if (!target.mkdirs()) {
-				LOG.warn("Error creating dirs to {}", target);
+									logExcerpt.append(System.lineSeparator());
+
+									// include some previous lines in case of irregular formatting
+									for (String previousLine : scanner.getPreviousLines()) {
+										logExcerpt.append(previousLine);
+										logExcerpt.append(System.lineSeparator());
+									}
+
+									logExcerpt.append(lineFromFile);
+									logExcerpt.append(System.lineSeparator());
+									// extract potential stack trace from log
+									while (scanner.hasNextLine()) {
+										String line = scanner.nextLine();
+										logExcerpt.append(line);
+										logExcerpt.append(System.lineSeparator());
+										if (line.isEmpty() || (!Character.isWhitespace(line.charAt(0)) && !line.startsWith("Caused by"))) {
+											// the cause has been printed, now add a few more lines in case of irregular formatting
+											for (int x = 0; x < 10 && scanner.hasNextLine(); x++) {
+												logExcerpt.append(scanner.nextLine());
+												logExcerpt.append(System.lineSeparator());
+											}
+											break;
+										}
+									}
+									prohibitedExcerpts.add(logExcerpt.toString());
+
+									return true;
+								}
+							}
+						}
+
+					}
+				} catch (FileNotFoundException e) {
+					LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath());
+				}
+
+				return false;
 			}
-			File src = tmp.getRoot();
-			LOG.info("copying the final files from {} to {}", src.getAbsolutePath(), target.getAbsolutePath());
+		});
+		if (foundFile != null) {
+			Scanner scanner =  null;
 			try {
-				FileUtils.copyDirectoryToDirectory(src, target);
-			} catch (IOException e) {
-				LOG.warn("Error copying the final files from {} to {}: msg: {}", src.getAbsolutePath(), target.getAbsolutePath(), e.getMessage(), e);
+				scanner = new Scanner(foundFile);
+			} catch (FileNotFoundException e) {
+				Assert.fail("Unable to locate file: " + e.getMessage() + " file: " + foundFile.getAbsolutePath());
 			}
+			LOG.warn("Found a file with a prohibited string. Printing contents:");
+			while (scanner.hasNextLine()) {
+				LOG.warn("LINE: " + scanner.nextLine());
+			}
+			Assert.fail(
+				"Found a file " + foundFile + " with a prohibited string (one of " + Arrays.toString(prohibited) + "). " +
+					"Excerpts:" + System.lineSeparator() + prohibitedExcerpts);
 		}
-
-	}
-
-	public static boolean isOnTravis() {
-		return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
 	}
 
 	/**
@@ -929,4 +842,61 @@ public String nextLine() {
 			return new ArrayList<>(bufferedLines);
 		}
 	}
+
+	public static boolean verifyStringsInNamedLogFiles(
+		final String[] mustHave, final String fileName) {
+		List<String> mustHaveList = Arrays.asList(mustHave);
+		File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+		if (!cwd.exists() || !cwd.isDirectory()) {
+			return false;
+		}
+
+		File foundFile = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+			@Override
+			public boolean accept(File dir, String name) {
+				if (fileName != null && !name.equals(fileName)) {
+					return false;
+				}
+				File f = new File(dir.getAbsolutePath() + "/" + name);
+				LOG.info("Searching in {}", f.getAbsolutePath());
+				try {
+					Set<String> foundSet = new HashSet<>(mustHave.length);
+					Scanner scanner = new Scanner(f);
+					while (scanner.hasNextLine()) {
+						final String lineFromFile = scanner.nextLine();
+						for (String str : mustHave) {
+							if (lineFromFile.contains(str)) {
+								foundSet.add(str);
+							}
+						}
+						if (foundSet.containsAll(mustHaveList)) {
+							return true;
+						}
+					}
+				} catch (FileNotFoundException e) {
+					LOG.warn("Unable to locate file: " + e.getMessage() + " file: " + f.getAbsolutePath());
+				}
+				return false;
+			}
+		});
+
+		if (foundFile != null) {
+			LOG.info("Found string {} in {}.", Arrays.toString(mustHave), foundFile.getAbsolutePath());
+			return true;
+		} else {
+			return false;
+		}
+	}
+
+	public static void sleep(int time) {
+		try {
+			Thread.sleep(time);
+		} catch (InterruptedException e) {
+			LOG.warn("Interrupted", e);
+		}
+	}
+
+	public static boolean isOnTravis() {
+		return System.getenv("TRAVIS") != null && System.getenv("TRAVIS").equals("true");
+	}
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services