You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/05/08 09:52:26 UTC

[1/2] flink git commit: Remove special-case krb5.conf code from YARN runners

Repository: flink
Updated Branches:
  refs/heads/master 53610c31e -> 5249981dc


Remove special-case krb5.conf code from YARN runners


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8101c0d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8101c0d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8101c0d6

Branch: refs/heads/master
Commit: 8101c0d69859a8d6e2580466f57f7c0d6898bfab
Parents: 53610c3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu May 3 16:23:29 2018 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 8 11:51:30 2018 +0200

----------------------------------------------------------------------
 .../flink/yarn/YarnApplicationMasterRunner.java | 19 +---------------
 .../flink/yarn/YarnTaskExecutorRunner.java      | 21 +-----------------
 .../yarn/YarnTaskManagerRunnerFactory.java      | 23 +-------------------
 .../yarn/entrypoint/YarnEntrypointUtils.java    | 18 +--------------
 4 files changed, 4 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8101c0d6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index ed1aad3..eb977bf 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -42,7 +42,6 @@ import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
@@ -59,7 +58,6 @@ import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Props;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@@ -68,7 +66,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -171,21 +168,7 @@ public class YarnApplicationMasterRunner {
 				flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
 			}
 
-			SecurityConfiguration sc;
-
-			//To support Yarn Secure Integration Test Scenario
-			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
-			if (krb5Conf.exists() && krb5Conf.canRead()) {
-				String krb5Path = krb5Conf.getAbsolutePath();
-				LOG.info("KRB5 Conf: {}", krb5Path);
-				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-				sc = new SecurityConfiguration(flinkConfig,
-					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
-			} else {
-				sc = new SecurityConfiguration(flinkConfig);
-			}
+			SecurityConfiguration sc = new SecurityConfiguration(flinkConfig);
 
 			SecurityUtils.install(sc);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8101c0d6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 8b54d87..0c676e7 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -28,21 +28,18 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -128,23 +125,7 @@ public class YarnTaskExecutorRunner {
 			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
 					currentUser.getShortUserName(), yarnClientUsername);
 
-			SecurityConfiguration sc;
-
-			//To support Yarn Secure Integration Test Scenario
-			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
-			if (krb5Conf.exists() && krb5Conf.canRead()) {
-				String krb5Path = krb5Conf.getAbsolutePath();
-				LOG.info("KRB5 Conf: {}", krb5Path);
-				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-
-				sc = new SecurityConfiguration(configuration,
-					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
-
-			} else {
-				sc = new SecurityConfiguration(configuration);
-			}
+			SecurityConfiguration sc = new SecurityConfiguration(configuration);
 
 			if (keytabPath != null && remoteKeytabPrincipal != null) {
 				configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);

http://git-wip-us.apache.org/repos/asf/flink/blob/8101c0d6/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
index d14248c..d6f2364 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunnerFactory.java
@@ -26,14 +26,12 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.slf4j.Logger;
@@ -41,7 +39,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
@@ -155,25 +152,7 @@ public class YarnTaskManagerRunnerFactory {
 		}
 
 		try {
-			SecurityConfiguration sc;
-
-			//To support Yarn Secure Integration Test Scenario
-			File krb5Conf = new File(currDir, Utils.KRB5_FILE_NAME);
-			if (krb5Conf.exists() && krb5Conf.canRead()) {
-				String krb5Path = krb5Conf.getAbsolutePath();
-				LOG.info("KRB5 Conf: {}", krb5Path);
-				org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-				hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-
-				sc = new SecurityConfiguration(configuration,
-					Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
-
-			} else {
-				sc = new SecurityConfiguration(configuration);
-
-			}
-
+			SecurityConfiguration sc = new SecurityConfiguration(configuration);
 			SecurityUtils.install(sc);
 
 			return new Runner(configuration, resourceId, taskManager);

http://git-wip-us.apache.org/repos/asf/flink/blob/8101c0d6/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
index 25d138d..49957e1 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnEntrypointUtils.java
@@ -32,20 +32,17 @@ import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.security.SecurityConfiguration;
 import org.apache.flink.runtime.security.SecurityContext;
 import org.apache.flink.runtime.security.SecurityUtils;
-import org.apache.flink.runtime.security.modules.HadoopModule;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.Utils;
 import org.apache.flink.yarn.YarnConfigKeys;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.slf4j.Logger;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -58,20 +55,7 @@ public class YarnEntrypointUtils {
 			Configuration configuration,
 			String workingDirectory) throws Exception {
 
-		SecurityConfiguration sc;
-
-		//To support Yarn Secure Integration Test Scenario
-		File krb5Conf = new File(workingDirectory, Utils.KRB5_FILE_NAME);
-		if (krb5Conf.exists() && krb5Conf.canRead()) {
-			org.apache.hadoop.conf.Configuration hadoopConfiguration = new org.apache.hadoop.conf.Configuration();
-			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
-			hadoopConfiguration.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");
-
-			sc = new SecurityConfiguration(configuration,
-				Collections.singletonList(securityConfig -> new HadoopModule(securityConfig, hadoopConfiguration)));
-		} else {
-			sc = new SecurityConfiguration(configuration);
-		}
+		SecurityConfiguration sc = new SecurityConfiguration(configuration);
 
 		SecurityUtils.install(sc);
 


[2/2] flink git commit: [FLINK-9306] Execute YARN ITCases in both legacy and new mode

Posted by al...@apache.org.
[FLINK-9306] Execute YARN ITCases in both legacy and new mode

Before, always setting mode to LEGACY_MODE when security settings are
present caused the test never to run with the new code. Now we use the
system property for determining whether to run in old or new mode.

For this, we also need to actually execute an example. Otherwise, no
TaskExecutors would be brought up.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5249981d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5249981d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5249981d

Branch: refs/heads/master
Commit: 5249981dc51401b041b380635b19ca1b01141119
Parents: 8101c0d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu May 3 16:27:40 2018 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 8 11:51:53 2018 +0200

----------------------------------------------------------------------
 .../YARNSessionCapacitySchedulerITCase.java     |   1 +
 .../flink/yarn/YARNSessionFIFOITCase.java       | 103 ++++++++++++++++---
 .../yarn/YARNSessionFIFOSecuredITCase.java      |   2 +
 .../org/apache/flink/yarn/YarnTestBase.java     |  39 +++----
 4 files changed, 105 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5249981d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 4bbd500..f0a5175 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -400,6 +400,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 				"-yj", flinkUberjar.getAbsolutePath(),
 				"-yt", flinkLibFolder.getAbsolutePath(),
 				"-yn", "1",
+				"-ys", "2",
 				"-yjm", "768",
 				"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
 				/* test succeeded after this string */

http://git-wip-us.apache.org/repos/asf/flink/blob/5249981d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index d9b02fb..bdaf2c6 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -23,10 +23,14 @@ import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
+import org.apache.flink.test.testdata.WordCountData;
+import org.apache.flink.test.util.SecureTestEnvironment;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
@@ -46,6 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
@@ -53,6 +58,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.yarn.UtilsTest.addTestAppender;
 import static org.apache.flink.yarn.UtilsTest.checkForLogString;
+import static org.apache.flink.yarn.util.YarnTestUtils.getTestJarPath;
 
 /**
  * This test starts a MiniYARNCluster with a FIFO scheduler.
@@ -85,21 +91,81 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	public void testDetachedMode() throws InterruptedException, IOException {
 		LOG.info("Starting testDetachedMode()");
 		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
-		Runner runner =
-			startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
-						"-t", flinkLibFolder.getAbsolutePath(),
-						"-n", "1",
-						"-jm", "768",
-						"-tm", "1024",
-						"--name", "MyCustomName", // test setting a custom name
-						"--detached"},
+
+		File exampleJarLocation = getTestJarPath("StreamingWordCount.jar");
+		// get temporary file for reading input data for wordcount example
+		File tmpInFile = tmp.newFile();
+		FileUtils.writeStringToFile(tmpInFile, WordCountData.TEXT);
+
+		ArrayList<String> args = new ArrayList<>();
+		args.add("-j");
+		args.add(flinkUberjar.getAbsolutePath());
+
+		args.add("-t");
+		args.add(flinkLibFolder.getAbsolutePath());
+
+		args.add("-n");
+		args.add("1");
+
+		args.add("-jm");
+		args.add("768");
+
+		args.add("-tm");
+		args.add("1024");
+
+		if (SecureTestEnvironment.getTestKeytab() != null) {
+			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_KEYTAB.key() + "=" + SecureTestEnvironment.getTestKeytab());
+		}
+		if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
+			args.add("-D" + SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + "=" + SecureTestEnvironment.getHadoopServicePrincipal());
+		}
+		args.add("--name");
+		args.add("MyCustomName");
+
+		args.add("--detached");
+
+		Runner clusterRunner =
+			startWithArgs(
+				args.toArray(new String[args.size()]),
 				"Flink JobManager is now running on", RunTypes.YARN_SESSION);
 
 		// before checking any strings outputted by the CLI, first give it time to return
-		runner.join();
-		checkForLogString("The Flink YARN client has been started in detached mode");
+		clusterRunner.join();
 
 		if (!isNewMode) {
+			checkForLogString("The Flink YARN client has been started in detached mode");
+
+			// in legacy mode we have to wait until the TMs are up until we can submit the job
+			LOG.info("Waiting until two containers are running");
+			// wait until two containers are running
+			while (getRunningContainers() < 2) {
+				sleep(500);
+			}
+
+			// additional sleep for the JM/TM to start and establish connection
+			long startTime = System.nanoTime();
+			while (System.nanoTime() - startTime < TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS) &&
+				!(verifyStringsInNamedLogFiles(
+					new String[]{"YARN Application Master started"}, "jobmanager.log") &&
+					verifyStringsInNamedLogFiles(
+						new String[]{"Starting TaskManager actor"}, "taskmanager.log"))) {
+				LOG.info("Still waiting for JM/TM to initialize...");
+				sleep(500);
+			}
+		}
+
+		// actually run a program, otherwise we wouldn't necessarily see any TaskManagers
+		// be brought up
+		Runner jobRunner = startWithArgs(new String[]{"run",
+				"--detached", exampleJarLocation.getAbsolutePath(),
+				"--input", tmpInFile.getAbsoluteFile().toString()},
+			"Job has been submitted with JobID", RunTypes.CLI_FRONTEND);
+
+		jobRunner.join();
+
+		if (isNewMode) {
+			// in "new" mode we can only wait after the job is submitted, because TMs
+			// are spun up lazily
 			LOG.info("Waiting until two containers are running");
 			// wait until two containers are running
 			while (getRunningContainers() < 2) {
@@ -107,16 +173,19 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			}
 		}
 
-		// additional sleep for the JM/TM to start and establish connection
+		// make sure we have two TMs running in either mode
 		long startTime = System.nanoTime();
 		while (System.nanoTime() - startTime < TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS) &&
-				!(verifyStringsInNamedLogFiles(
-						new String[]{"YARN Application Master started"}, "jobmanager.log") &&
-						verifyStringsInNamedLogFiles(
-								new String[]{"Starting TaskManager actor"}, "taskmanager.log"))) {
+			!(verifyStringsInNamedLogFiles(
+				new String[]{isNewMode ? "JobManager successfully registered at ResourceManager"
+					: "YARN Application Master started"}, "jobmanager.log") &&
+				verifyStringsInNamedLogFiles(
+					new String[]{isNewMode ? "Successful registration at job manager"
+						: "Starting TaskManager actor"}, "taskmanager.log"))) {
 			LOG.info("Still waiting for JM/TM to initialize...");
 			sleep(500);
 		}
+
 		LOG.info("Two containers are running. Killing the application");
 
 		// kill application "externally".
@@ -132,7 +201,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			ApplicationId id = app.getApplicationId();
 			yc.killApplication(id);
 
-			while (yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
+			while (yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0 &&
+					yc.getApplications(EnumSet.of(YarnApplicationState.FINISHED)).size() == 0) {
 				sleep(500);
 			}
 		} catch (Throwable t) {
@@ -158,7 +228,6 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			} catch (Exception e) {
 				LOG.warn("testDetachedPerJobYarnClusterInternal: Exception while deleting the JobManager address file", e);
 			}
-
 		}
 
 		LOG.info("Finished testDetachedMode()");

http://git-wip-us.apache.org/repos/asf/flink/blob/5249981d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 46a37a0..97f60fc 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -33,6 +33,7 @@ import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,6 +101,7 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
 		SecureTestEnvironment.cleanup();
 	}
 
+	@Test(timeout = 60000) // timeout after a minute.
 	@Override
 	public void testDetachedMode() throws InterruptedException, IOException {
 		super.testDetachedMode();

http://git-wip-us.apache.org/repos/asf/flink/blob/5249981d/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 421e4c0..c4e498e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.Preconditions;
@@ -30,7 +29,6 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileUtil;
@@ -75,13 +73,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Scanner;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.regex.Pattern;
 
-import static org.apache.flink.configuration.CoreOptions.LEGACY_MODE;
+import static org.apache.flink.test.util.MiniClusterResource.CODEBASE_KEY;
+import static org.apache.flink.test.util.MiniClusterResource.NEW_CODEBASE;
 
 /**
  * This base class allows to use the MiniYARNCluster.
@@ -220,7 +220,7 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 
 		flinkConfiguration = new org.apache.flink.configuration.Configuration(globalConfiguration);
-		isNewMode = CoreOptions.NEW_MODE.equalsIgnoreCase(flinkConfiguration.getString(CoreOptions.MODE));
+		isNewMode = Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY));
 	}
 
 	@Nullable
@@ -519,32 +519,25 @@ public abstract class YarnTestBase extends TestLogger {
 			final String confDirPath = flinkConfDirPath.getParentFile().getAbsolutePath();
 			globalConfiguration = GlobalConfiguration.loadConfiguration(confDirPath);
 
-			if (!StringUtils.isBlank(principal) && !StringUtils.isBlank(keytab)) {
+			//copy conf dir to test temporary workspace location
+			tempConfPathForSecureRun = tmp.newFolder("conf");
 
-				//copy conf dir to test temporary workspace location
-				tempConfPathForSecureRun = tmp.newFolder("conf");
+			FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
 
-				FileUtils.copyDirectory(new File(confDirPath), tempConfPathForSecureRun);
+			globalConfiguration.setString(CoreOptions.MODE,
+				Objects.equals(NEW_CODEBASE, System.getProperty(CODEBASE_KEY)) ? CoreOptions.NEW_MODE : CoreOptions.LEGACY_MODE);
 
-				globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB.key(), keytab);
-				globalConfiguration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key(), principal);
-				globalConfiguration.setString(CoreOptions.MODE.key(), LEGACY_MODE);
+			BootstrapTools.writeConfiguration(
+				globalConfiguration,
+				new File(tempConfPathForSecureRun, "flink-conf.yaml"));
 
-				BootstrapTools.writeConfiguration(
-					globalConfiguration,
-					new File(tempConfPathForSecureRun, "flink-conf.yaml"));
+			String configDir = tempConfPathForSecureRun.getAbsolutePath();
 
-				String configDir = tempConfPathForSecureRun.getAbsolutePath();
+			LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
 
-				LOG.info("Temporary Flink configuration directory to be used for secure test: {}", configDir);
+			Assert.assertNotNull(configDir);
 
-				Assert.assertNotNull(configDir);
-
-				map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
-
-			} else {
-				map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
-			}
+			map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
 
 			File yarnConfFile = writeYarnSiteConfigXML(conf);
 			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());