You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/13 14:32:37 UTC

[flink] branch master updated: [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e953d1  [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
0e953d1 is described below

commit 0e953d10c2cdc61dc978a13c0c94320034bf0179
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Apr 10 22:53:25 2019 +0800

    [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
    
    Remove setting of yarn.minicluster.fixed.ports
    
    copy yarn-site.xml to target/test-classes
    
    This commit closes #8144.
---
 .../java/org/apache/flink/yarn/YarnTestBase.java   | 21 +++++++-----
 .../flink/yarn/AbstractYarnClusterDescriptor.java  | 38 +++++++++-------------
 .../src/main/java/org/apache/flink/yarn/Utils.java | 12 ++++---
 3 files changed, 35 insertions(+), 36 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index bd46dc4..9aad148 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -154,6 +154,8 @@ public abstract class YarnTestBase extends TestLogger {
 	protected static File tempConfPathForSecureRun = null;
 	protected static File flinkShadedHadoopDir;
 
+	protected static File yarnSiteXML = null;
+
 	private YarnClient yarnClient = null;
 
 	private static org.apache.flink.configuration.Configuration globalConfiguration;
@@ -166,7 +168,6 @@ public abstract class YarnTestBase extends TestLogger {
 		YARN_CONFIGURATION = new YarnConfiguration();
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32);
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
-		YARN_CONFIGURATION.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
 		YARN_CONFIGURATION.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
@@ -340,15 +341,14 @@ public abstract class YarnTestBase extends TestLogger {
 		}
 	}
 
-	public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
-		tmp.create();
-		File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
-
+	// write yarn-site.xml to target/test-classes so that flink pick can pick up this when
+	// initializing YarnClient properly from classpath
+	public static void writeYarnSiteConfigXML(Configuration yarnConf, File targetFolder) throws IOException {
+		yarnSiteXML = new File(targetFolder, "/yarn-site.xml");
 		try (FileWriter writer = new FileWriter(yarnSiteXML)) {
 			yarnConf.writeXml(writer);
 			writer.flush();
 		}
-		return yarnSiteXML;
 	}
 
 	/**
@@ -584,9 +584,10 @@ public abstract class YarnTestBase extends TestLogger {
 
 			map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
 
-			File yarnConfFile = writeYarnSiteConfigXML(conf);
-			map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
+			File targetTestClassesFolder = new File("target/test-classes");
+			writeYarnSiteConfigXML(conf, targetTestClassesFolder);
 			map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos
+			map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath());
 			TestBaseUtils.setEnv(map);
 
 			Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
@@ -890,6 +891,10 @@ public abstract class YarnTestBase extends TestLogger {
 			tempConfPathForSecureRun = null;
 		}
 
+		if (yarnSiteXML != null) {
+			yarnSiteXML.delete();
+		}
+
 		// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
 		// to <flinkRoot>/target/flink-yarn-tests-*.
 		// The files from there are picked up by the ./tools/travis_watchdog.sh script
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 23fddd5..20b5417 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -154,16 +154,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 			boolean sharedYarnClient) {
 
 		this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
-
-		// for unit tests only
-		if (System.getenv("IN_TESTS") != null) {
-			try {
-				yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml").toURI().toURL());
-			} catch (Throwable t) {
-				throw new RuntimeException("Error", t);
-			}
-		}
-
 		this.yarnClient = Preconditions.checkNotNull(yarnClient);
 		this.sharedYarnClient = sharedYarnClient;
 
@@ -874,6 +864,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		Path remoteYarnSiteXmlPath = null;
 		boolean hasKrb5 = false;
 		if (System.getenv("IN_TESTS") != null) {
+			File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
+			LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
+			Path yarnSitePath = new Path(f.getAbsolutePath());
+			remoteYarnSiteXmlPath = setupSingleLocalResource(
+				Utils.YARN_SITE_FILE_NAME,
+				fs,
+				appId,
+				yarnSitePath,
+				localResources,
+				homeDir,
+				"");
+
 			String krb5Config = System.getProperty("java.security.krb5.conf");
 			if (krb5Config != null && krb5Config.length() != 0) {
 				File krb5 = new File(krb5Config);
@@ -887,18 +889,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 					localResources,
 					homeDir,
 					"");
-
-				File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
-				LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
-				Path yarnSitePath = new Path(f.getAbsolutePath());
-				remoteYarnSiteXmlPath = setupSingleLocalResource(
-					Utils.YARN_SITE_FILE_NAME,
-					fs,
-					appId,
-					yarnSitePath,
-					localResources,
-					homeDir,
-					"");
 				hasKrb5 = true;
 			}
 		}
@@ -963,8 +953,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		}
 
 		//To support Yarn Secure Integration Test Scenario
-		if (remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+		if (remoteYarnSiteXmlPath != null) {
 			appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
+		}
+		if (remoteKrb5Path != null) {
 			appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
 		}
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 20e02e1..261bc97 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -432,17 +432,18 @@ public final class Utils {
 		LocalResource yarnConfResource = null;
 		LocalResource krb5ConfResource = null;
 		boolean hasKrb5 = false;
-		if (remoteYarnConfPath != null && remoteKrb5Path != null) {
+		if (remoteYarnConfPath != null) {
 			log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
 			Path yarnConfPath = new Path(remoteYarnConfPath);
 			FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
 			yarnConfResource = registerLocalResource(fs, yarnConfPath);
+		}
 
+		if (remoteKrb5Path != null) {
 			log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
 			Path krb5ConfPath = new Path(remoteKrb5Path);
-			fs = krb5ConfPath.getFileSystem(yarnConfig);
+			FileSystem fs = krb5ConfPath.getFileSystem(yarnConfig);
 			krb5ConfResource = registerLocalResource(fs, krb5ConfPath);
-
 			hasKrb5 = true;
 		}
 
@@ -490,11 +491,12 @@ public final class Utils {
 		taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
 
 		//To support Yarn Secure Integration Test Scenario
-		if (yarnConfResource != null && krb5ConfResource != null) {
+		if (yarnConfResource != null) {
 			taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
+		}
+		if (krb5ConfResource != null) {
 			taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
 		}
-
 		if (keytabResource != null) {
 			taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
 		}