You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 15:18:14 UTC
[flink] 02/03: [FLINK-11106][tests] Port
YarnTaskManagerRunnerFactoryTest to new code base
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ef5242405f7977f9c330eb99e1e9b86f3c7d252b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 31 14:22:35 2019 +0100
[FLINK-11106][tests] Port YarnTaskManagerRunnerFactoryTest to new code base
The test YarnTaskManagerRunnerFactoryTest#testKerberosKeytabConfiguration is now covered by
YarnTaskExecutorRunnerTest#testKerberosKeytabConfiguration. In order to test the behaviour,
the YarnTaskExecutorRunner was slightly refactored to make it testable.
---
.../apache/flink/yarn/YarnTaskExecutorRunner.java | 97 +++++++++++++---------
.../test/java/org/apache/flink/yarn/UtilsTest.java | 4 +-
.../flink/yarn/YarnTaskExecutorRunnerTest.java | 72 ++++++++++++++++
3 files changed, 131 insertions(+), 42 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 2058533..c403fbd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -18,6 +18,7 @@
package org.apache.flink.yarn;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -86,58 +88,18 @@ public class YarnTaskExecutorRunner {
try {
LOG.debug("All environment variables: {}", ENV);
- final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
- final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
- LOG.info("Current working/local Directory: {}", localDirs);
-
final String currDir = ENV.get(Environment.PWD.key());
LOG.info("Current working Directory: {}", currDir);
- final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
- LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
-
- final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
- LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
-
final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
FileSystem.initialize(configuration);
- BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
-
- // tell akka to die in case of an error
- configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
-
- String keytabPath = null;
- if (remoteKeytabPath != null) {
- File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
- keytabPath = f.getAbsolutePath();
- LOG.info("keytab path: {}", keytabPath);
- }
-
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-
- LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
- currentUser.getShortUserName(), yarnClientUsername);
-
- if (keytabPath != null && remoteKeytabPrincipal != null) {
- configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
- configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
- }
-
- SecurityConfiguration sc = new SecurityConfiguration(configuration);
+ setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
final String containerId = ENV.get(YarnResourceManager.ENV_FLINK_CONTAINER_ID);
Preconditions.checkArgument(containerId != null,
"ContainerId variable %s not set", YarnResourceManager.ENV_FLINK_CONTAINER_ID);
- // use the hostname passed by job manager
- final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
- if (taskExecutorHostname != null) {
- configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
- }
-
- SecurityUtils.install(sc);
-
SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> {
TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
return null;
@@ -150,4 +112,57 @@ public class YarnTaskExecutorRunner {
System.exit(INIT_ERROR_EXIT_CODE);
}
}
+
+ @VisibleForTesting
+ static void setupConfigurationAndInstallSecurityContext(Configuration configuration, String currDir, Map<String, String> variables) throws Exception {
+ final String localDirs = variables.get(Environment.LOCAL_DIRS.key());
+ LOG.info("Current working/local Directory: {}", localDirs);
+
+ BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
+
+ setupConfigurationFromVariables(configuration, currDir, variables);
+
+ installSecurityContext(configuration);
+ }
+
+ private static void setupConfigurationFromVariables(Configuration configuration, String currDir, Map<String, String> variables) throws IOException {
+ final String yarnClientUsername = variables.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+ final String remoteKeytabPath = variables.get(YarnConfigKeys.KEYTAB_PATH);
+ LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
+
+ final String remoteKeytabPrincipal = variables.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+ // tell akka to die in case of an error
+ configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+
+ String keytabPath = null;
+ if (remoteKeytabPath != null) {
+ File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+ keytabPath = f.getAbsolutePath();
+ LOG.info("keytab path: {}", keytabPath);
+ }
+
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+ LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+ currentUser.getShortUserName(), yarnClientUsername);
+
+ if (keytabPath != null && remoteKeytabPrincipal != null) {
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+ configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
+ }
+
+ // use the hostname passed by job manager
+ final String taskExecutorHostname = variables.get(YarnResourceManager.ENV_FLINK_NODE_ID);
+ if (taskExecutorHostname != null) {
+ configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
+ }
+ }
+
+ private static void installSecurityContext(Configuration configuration) throws Exception {
+ SecurityConfiguration sc = new SecurityConfiguration(configuration);
+ SecurityUtils.install(sc);
+ }
}
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index b272a21..a5ae14c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.yarn;
+import org.apache.flink.util.TestLogger;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -33,7 +35,7 @@ import static org.junit.Assert.assertThat;
/**
* Tests for {@link Utils}.
*/
-public class UtilsTest {
+public class UtilsTest extends TestLogger {
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
new file mode 100644
index 0000000..18726ad
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link YarnTaskExecutorRunner}.
+ */
+public class YarnTaskExecutorRunnerTest extends TestLogger {
+
+ @Test
+ public void testKerberosKeytabConfiguration() throws Exception {
+ final String resourceDirPath = Paths.get("src", "test", "resources").toAbsolutePath().toString();
+
+ final Map<String, String> envs = new HashMap<>(2);
+ envs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, "testuser1@domain");
+ envs.put(YarnConfigKeys.KEYTAB_PATH, resourceDirPath);
+
+ Configuration configuration = new Configuration();
+ YarnTaskExecutorRunner.setupConfigurationAndInstallSecurityContext(configuration, resourceDirPath, envs);
+
+ final List<SecurityModule> modules = SecurityUtils.getInstalledModules();
+ Optional<SecurityModule> moduleOpt = modules.stream().filter(module -> module instanceof HadoopModule).findFirst();
+
+ if (moduleOpt.isPresent()) {
+ HadoopModule hadoopModule = (HadoopModule) moduleOpt.get();
+ assertThat(hadoopModule.getSecurityConfig().getPrincipal(), is("testuser1@domain"));
+ assertThat(hadoopModule.getSecurityConfig().getKeytab(), is(new File(resourceDirPath, Utils.KEYTAB_FILE_NAME).getAbsolutePath()));
+ } else {
+ fail("Can not find HadoopModule!");
+ }
+
+ assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB), is(new File(resourceDirPath, Utils.KEYTAB_FILE_NAME).getAbsolutePath()));
+ assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL), is("testuser1@domain"));
+ }
+
+}