You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/31 15:18:14 UTC

[flink] 02/03: [FLINK-11106][tests] Port YarnTaskManagerRunnerFactoryTest to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ef5242405f7977f9c330eb99e1e9b86f3c7d252b
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Thu Jan 31 14:22:35 2019 +0100

    [FLINK-11106][tests] Port YarnTaskManagerRunnerFactoryTest to new code base
    
    The test YarnTaskManagerRunnerFactoryTest#testKerberosKeytabConfiguration is now covered by
    YarnTaskExecutorRunnerTest#testKerberosKeytabConfiguration. In order to test the behaviour,
    the YarnTaskExecutorRunner was slightly refactored to make it testable.
---
 .../apache/flink/yarn/YarnTaskExecutorRunner.java  | 97 +++++++++++++---------
 .../test/java/org/apache/flink/yarn/UtilsTest.java |  4 +-
 .../flink/yarn/YarnTaskExecutorRunnerTest.java     | 72 ++++++++++++++++
 3 files changed, 131 insertions(+), 42 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
index 2058533..c403fbd 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -41,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.util.Map;
 import java.util.concurrent.Callable;
@@ -86,58 +88,18 @@ public class YarnTaskExecutorRunner {
 		try {
 			LOG.debug("All environment variables: {}", ENV);
 
-			final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
-			final String localDirs = ENV.get(Environment.LOCAL_DIRS.key());
-			LOG.info("Current working/local Directory: {}", localDirs);
-
 			final String currDir = ENV.get(Environment.PWD.key());
 			LOG.info("Current working Directory: {}", currDir);
 
-			final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH);
-			LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
-
-			final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-			LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
-
 			final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir);
 			FileSystem.initialize(configuration);
 
-			BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
-
-			// tell akka to die in case of an error
-			configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
-
-			String keytabPath = null;
-			if (remoteKeytabPath != null) {
-				File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
-				keytabPath = f.getAbsolutePath();
-				LOG.info("keytab path: {}", keytabPath);
-			}
-
-			UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
-
-			LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
-					currentUser.getShortUserName(), yarnClientUsername);
-
-			if (keytabPath != null && remoteKeytabPrincipal != null) {
-				configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
-				configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
-			}
-
-			SecurityConfiguration sc = new SecurityConfiguration(configuration);
+			setupConfigurationAndInstallSecurityContext(configuration, currDir, ENV);
 
 			final String containerId = ENV.get(YarnResourceManager.ENV_FLINK_CONTAINER_ID);
 			Preconditions.checkArgument(containerId != null,
 				"ContainerId variable %s not set", YarnResourceManager.ENV_FLINK_CONTAINER_ID);
 
-			// use the hostname passed by job manager
-			final String taskExecutorHostname = ENV.get(YarnResourceManager.ENV_FLINK_NODE_ID);
-			if (taskExecutorHostname != null) {
-				configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
-			}
-
-			SecurityUtils.install(sc);
-
 			SecurityUtils.getInstalledContext().runSecured((Callable<Void>) () -> {
 				TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId));
 				return null;
@@ -150,4 +112,57 @@ public class YarnTaskExecutorRunner {
 			System.exit(INIT_ERROR_EXIT_CODE);
 		}
 	}
+
+	@VisibleForTesting
+	static void setupConfigurationAndInstallSecurityContext(Configuration configuration, String currDir, Map<String, String> variables) throws Exception {
+		final String localDirs = variables.get(Environment.LOCAL_DIRS.key());
+		LOG.info("Current working/local Directory: {}", localDirs);
+
+		BootstrapTools.updateTmpDirectoriesInConfiguration(configuration, localDirs);
+
+		setupConfigurationFromVariables(configuration, currDir, variables);
+
+		installSecurityContext(configuration);
+	}
+
+	private static void setupConfigurationFromVariables(Configuration configuration, String currDir, Map<String, String> variables) throws IOException {
+		final String yarnClientUsername = variables.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+
+		final String remoteKeytabPath = variables.get(YarnConfigKeys.KEYTAB_PATH);
+		LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath);
+
+		final String remoteKeytabPrincipal = variables.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+		LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal);
+
+		// tell akka to die in case of an error
+		configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
+
+		String keytabPath = null;
+		if (remoteKeytabPath != null) {
+			File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
+			keytabPath = f.getAbsolutePath();
+			LOG.info("keytab path: {}", keytabPath);
+		}
+
+		UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+
+		LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}",
+				currentUser.getShortUserName(), yarnClientUsername);
+
+		if (keytabPath != null && remoteKeytabPrincipal != null) {
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, keytabPath);
+			configuration.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, remoteKeytabPrincipal);
+		}
+
+		// use the hostname passed by job manager
+		final String taskExecutorHostname = variables.get(YarnResourceManager.ENV_FLINK_NODE_ID);
+		if (taskExecutorHostname != null) {
+			configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname);
+		}
+	}
+
+	private static void installSecurityContext(Configuration configuration) throws Exception {
+		SecurityConfiguration sc = new SecurityConfiguration(configuration);
+		SecurityUtils.install(sc);
+	}
 }
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
index b272a21..a5ae14c 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -33,7 +35,7 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for {@link Utils}.
  */
-public class UtilsTest {
+public class UtilsTest extends TestLogger {
 
 	@Rule
 	public TemporaryFolder temporaryFolder = new TemporaryFolder();
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
new file mode 100644
index 0000000..18726ad
--- /dev/null
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnTaskExecutorRunnerTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.SecurityOptions;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.modules.HadoopModule;
+import org.apache.flink.runtime.security.modules.SecurityModule;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link YarnTaskExecutorRunner}.
+ */
+public class YarnTaskExecutorRunnerTest extends TestLogger {
+
+	@Test
+	public void testKerberosKeytabConfiguration() throws Exception {
+		final String resourceDirPath = Paths.get("src", "test", "resources").toAbsolutePath().toString();
+
+		final Map<String, String> envs = new HashMap<>(2);
+		envs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, "testuser1@domain");
+		envs.put(YarnConfigKeys.KEYTAB_PATH, resourceDirPath);
+
+		Configuration configuration = new Configuration();
+		YarnTaskExecutorRunner.setupConfigurationAndInstallSecurityContext(configuration, resourceDirPath, envs);
+
+		final List<SecurityModule> modules = SecurityUtils.getInstalledModules();
+		Optional<SecurityModule> moduleOpt = modules.stream().filter(module -> module instanceof HadoopModule).findFirst();
+
+		if (moduleOpt.isPresent()) {
+			HadoopModule hadoopModule = (HadoopModule) moduleOpt.get();
+			assertThat(hadoopModule.getSecurityConfig().getPrincipal(), is("testuser1@domain"));
+			assertThat(hadoopModule.getSecurityConfig().getKeytab(), is(new File(resourceDirPath, Utils.KEYTAB_FILE_NAME).getAbsolutePath()));
+		} else {
+			fail("Can not find HadoopModule!");
+		}
+
+		assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB), is(new File(resourceDirPath, Utils.KEYTAB_FILE_NAME).getAbsolutePath()));
+		assertThat(configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL), is("testuser1@domain"));
+	}
+
+}