You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/13 14:32:37 UTC
[flink] branch master updated: [FLINK-12159]. Enable
YarnMiniCluster integration test under non-secure mode
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 0e953d1 [FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
0e953d1 is described below
commit 0e953d10c2cdc61dc978a13c0c94320034bf0179
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Apr 10 22:53:25 2019 +0800
[FLINK-12159]. Enable YarnMiniCluster integration test under non-secure mode
Remove setting of yarn.minicluster.fixed.ports
copy yarn-site.xml to target/test-classes
This commit closes #8144.
---
.../java/org/apache/flink/yarn/YarnTestBase.java | 21 +++++++-----
.../flink/yarn/AbstractYarnClusterDescriptor.java | 38 +++++++++-------------
.../src/main/java/org/apache/flink/yarn/Utils.java | 12 ++++---
3 files changed, 35 insertions(+), 36 deletions(-)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index bd46dc4..9aad148 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -154,6 +154,8 @@ public abstract class YarnTestBase extends TestLogger {
protected static File tempConfPathForSecureRun = null;
protected static File flinkShadedHadoopDir;
+ protected static File yarnSiteXML = null;
+
private YarnClient yarnClient = null;
private static org.apache.flink.configuration.Configuration globalConfiguration;
@@ -166,7 +168,6 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION = new YarnConfiguration();
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 32);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 4096); // 4096 is the available memory anyways
- YARN_CONFIGURATION.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
YARN_CONFIGURATION.setBoolean(YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME, true);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 2);
@@ -340,15 +341,14 @@ public abstract class YarnTestBase extends TestLogger {
}
}
- public static File writeYarnSiteConfigXML(Configuration yarnConf) throws IOException {
- tmp.create();
- File yarnSiteXML = new File(tmp.newFolder().getAbsolutePath() + "/yarn-site.xml");
-
+ // write yarn-site.xml to target/test-classes so that flink pick can pick up this when
+ // initializing YarnClient properly from classpath
+ public static void writeYarnSiteConfigXML(Configuration yarnConf, File targetFolder) throws IOException {
+ yarnSiteXML = new File(targetFolder, "/yarn-site.xml");
try (FileWriter writer = new FileWriter(yarnSiteXML)) {
yarnConf.writeXml(writer);
writer.flush();
}
- return yarnSiteXML;
}
/**
@@ -584,9 +584,10 @@ public abstract class YarnTestBase extends TestLogger {
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, configDir);
- File yarnConfFile = writeYarnSiteConfigXML(conf);
- map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
+ File targetTestClassesFolder = new File("target/test-classes");
+ writeYarnSiteConfigXML(conf, targetTestClassesFolder);
map.put("IN_TESTS", "yes we are in tests"); // see YarnClusterDescriptor() for more infos
+ map.put("YARN_CONF_DIR", targetTestClassesFolder.getAbsolutePath());
TestBaseUtils.setEnv(map);
Assert.assertTrue(yarnCluster.getServiceState() == Service.STATE.STARTED);
@@ -890,6 +891,10 @@ public abstract class YarnTestBase extends TestLogger {
tempConfPathForSecureRun = null;
}
+ if (yarnSiteXML != null) {
+ yarnSiteXML.delete();
+ }
+
// When we are on travis, we copy the temp files of JUnit (containing the MiniYARNCluster log files)
// to <flinkRoot>/target/flink-yarn-tests-*.
// The files from there are picked up by the ./tools/travis_watchdog.sh script
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 23fddd5..20b5417 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -154,16 +154,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
boolean sharedYarnClient) {
this.yarnConfiguration = Preconditions.checkNotNull(yarnConfiguration);
-
- // for unit tests only
- if (System.getenv("IN_TESTS") != null) {
- try {
- yarnConfiguration.addResource(new File(System.getenv("YARN_CONF_DIR"), "yarn-site.xml").toURI().toURL());
- } catch (Throwable t) {
- throw new RuntimeException("Error", t);
- }
- }
-
this.yarnClient = Preconditions.checkNotNull(yarnClient);
this.sharedYarnClient = sharedYarnClient;
@@ -874,6 +864,18 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
Path remoteYarnSiteXmlPath = null;
boolean hasKrb5 = false;
if (System.getenv("IN_TESTS") != null) {
+ File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
+ LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
+ Path yarnSitePath = new Path(f.getAbsolutePath());
+ remoteYarnSiteXmlPath = setupSingleLocalResource(
+ Utils.YARN_SITE_FILE_NAME,
+ fs,
+ appId,
+ yarnSitePath,
+ localResources,
+ homeDir,
+ "");
+
String krb5Config = System.getProperty("java.security.krb5.conf");
if (krb5Config != null && krb5Config.length() != 0) {
File krb5 = new File(krb5Config);
@@ -887,18 +889,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
localResources,
homeDir,
"");
-
- File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
- LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
- Path yarnSitePath = new Path(f.getAbsolutePath());
- remoteYarnSiteXmlPath = setupSingleLocalResource(
- Utils.YARN_SITE_FILE_NAME,
- fs,
- appId,
- yarnSitePath,
- localResources,
- homeDir,
- "");
hasKrb5 = true;
}
}
@@ -963,8 +953,10 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
}
//To support Yarn Secure Integration Test Scenario
- if (remoteYarnSiteXmlPath != null && remoteKrb5Path != null) {
+ if (remoteYarnSiteXmlPath != null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
+ }
+ if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 20e02e1..261bc97 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -432,17 +432,18 @@ public final class Utils {
LocalResource yarnConfResource = null;
LocalResource krb5ConfResource = null;
boolean hasKrb5 = false;
- if (remoteYarnConfPath != null && remoteKrb5Path != null) {
+ if (remoteYarnConfPath != null) {
log.info("TM:Adding remoteYarnConfPath {} to the container local resource bucket", remoteYarnConfPath);
Path yarnConfPath = new Path(remoteYarnConfPath);
FileSystem fs = yarnConfPath.getFileSystem(yarnConfig);
yarnConfResource = registerLocalResource(fs, yarnConfPath);
+ }
+ if (remoteKrb5Path != null) {
log.info("TM:Adding remoteKrb5Path {} to the container local resource bucket", remoteKrb5Path);
Path krb5ConfPath = new Path(remoteKrb5Path);
- fs = krb5ConfPath.getFileSystem(yarnConfig);
+ FileSystem fs = krb5ConfPath.getFileSystem(yarnConfig);
krb5ConfResource = registerLocalResource(fs, krb5ConfPath);
-
hasKrb5 = true;
}
@@ -490,11 +491,12 @@ public final class Utils {
taskManagerLocalResources.put("flink-conf.yaml", flinkConf);
//To support Yarn Secure Integration Test Scenario
- if (yarnConfResource != null && krb5ConfResource != null) {
+ if (yarnConfResource != null) {
taskManagerLocalResources.put(YARN_SITE_FILE_NAME, yarnConfResource);
+ }
+ if (krb5ConfResource != null) {
taskManagerLocalResources.put(KRB5_FILE_NAME, krb5ConfResource);
}
-
if (keytabResource != null) {
taskManagerLocalResources.put(KEYTAB_FILE_NAME, keytabResource);
}