You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/27 10:36:16 UTC
[flink] branch release-1.7 updated: [FLINK-11126][YARN][security]
Filter out AMRMToken in the TaskManager credentials
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push:
new 0569fcb [FLINK-11126][YARN][security] Filter out AMRMToken in the TaskManager credentials
0569fcb is described below
commit 0569fcb1d6f23ee083db686aa9db78f5a210282e
Author: Paul Lam <pa...@gmail.com>
AuthorDate: Wed Dec 12 11:39:35 2018 +0800
[FLINK-11126][YARN][security] Filter out AMRMToken in the TaskManager credentials
This closes #7895.
---
.../test/java/org/apache/flink/yarn/UtilsTest.java | 88 ++++++++++++++++++++++
.../flink/yarn/YARNSessionFIFOSecuredITCase.java | 19 +++++
.../java/org/apache/flink/yarn/YarnTestBase.java | 54 ++++++++++++-
.../src/main/java/org/apache/flink/yarn/Utils.java | 13 +++-
4 files changed, 171 insertions(+), 3 deletions(-)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
index 1262096..3a3144e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java
@@ -21,28 +21,50 @@ package org.apache.flink.yarn;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.util.TestLogger;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests for various utilities.
*/
public class UtilsTest extends TestLogger {
private static final Logger LOG = LoggerFactory.getLogger(UtilsTest.class);
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Test
public void testUberjarLocator() {
File dir = YarnTestBase.findFile("..", new YarnTestBase.RootDirFilenameFilter());
@@ -136,6 +158,72 @@ public class UtilsTest extends TestLogger {
Assert.assertEquals(0, res.size());
}
+ @Test
+ public void testCreateTaskExecutorCredentials() throws Exception {
+ File root = temporaryFolder.getRoot();
+ File home = new File(root, "home");
+ boolean created = home.mkdir();
+ assertTrue(created);
+
+ Configuration flinkConf = new Configuration();
+ YarnConfiguration yarnConf = new YarnConfiguration();
+
+ Map<String, String> env = new HashMap<>();
+ env.put(YarnConfigKeys.ENV_APP_ID, "foo");
+ env.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+ env.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, "");
+ env.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, "");
+ env.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, "foo");
+ env.put(YarnConfigKeys.FLINK_JAR_PATH, root.toURI().toString());
+ env = Collections.unmodifiableMap(env);
+
+ File credentialFile = temporaryFolder.newFile("container_tokens");
+ final Text amRmTokenKind = AMRMTokenIdentifier.KIND_NAME;
+ final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
+ final Text service = new Text("test-service");
+ Credentials amCredentials = new Credentials();
+ amCredentials.addToken(amRmTokenKind, new Token<>(new byte[4], new byte[4], amRmTokenKind, service));
+ amCredentials.addToken(hdfsDelegationTokenKind, new Token<>(new byte[4], new byte[4],
+ hdfsDelegationTokenKind, service));
+ amCredentials.writeTokenStorageFile(new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf);
+
+ ContaineredTaskManagerParameters tmParams = new ContaineredTaskManagerParameters(64,
+ 64, 16, 1, new HashMap<>(1));
+ Configuration taskManagerConf = new Configuration();
+
+ String workingDirectory = root.getAbsolutePath();
+ Class<?> taskManagerMainClass = YarnTaskExecutorRunner.class;
+ ContainerLaunchContext ctx;
+
+ final Map<String, String> originalEnv = System.getenv();
+ try {
+ Map<String, String> systemEnv = new HashMap<>(originalEnv);
+ systemEnv.put("HADOOP_TOKEN_FILE_LOCATION", credentialFile.getAbsolutePath());
+ CommonTestUtils.setEnv(systemEnv);
+ ctx = Utils.createTaskExecutorContext(flinkConf, yarnConf, env, tmParams,
+ taskManagerConf, workingDirectory, taskManagerMainClass, LOG);
+ } finally {
+ CommonTestUtils.setEnv(originalEnv);
+ }
+
+ Credentials credentials = new Credentials();
+ try (DataInputStream dis = new DataInputStream(new ByteArrayInputStream(ctx.getTokens().array()))) {
+ credentials.readTokenStorageStream(dis);
+ }
+ Collection<Token<? extends TokenIdentifier>> tokens = credentials.getAllTokens();
+ boolean hasHdfsDelegationToken = false;
+ boolean hasAmRmToken = false;
+ for (Token<? extends TokenIdentifier> token : tokens) {
+ if (token.getKind().equals(amRmTokenKind)) {
+ hasAmRmToken = true;
+ } else if (token.getKind().equals(hdfsDelegationTokenKind)) {
+ hasHdfsDelegationToken = true;
+ }
+ }
+ assertTrue(hasHdfsDelegationToken);
+ assertFalse(hasAmRmToken);
+ }
+
//
// --------------- Tools to test if a certain string has been logged with Log4j. -------------
// See : http://stackoverflow.com/questions/3717402/how-to-test-w-junit-that-warning-was-logged-w-log4j
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
index 97f60fc..03d6bd9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOSecuredITCase.java
@@ -26,7 +26,10 @@ import org.apache.flink.runtime.security.modules.HadoopModule;
import org.apache.flink.test.util.SecureTestEnvironment;
import org.apache.flink.test.util.TestingSecurityContext;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.hamcrest.Matchers;
@@ -39,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.Callable;
/**
@@ -116,6 +120,21 @@ public class YARNSessionFIFOSecuredITCase extends YARNSessionFIFOITCase {
"The JobManager and the TaskManager should both run with Kerberos.",
jobManagerRunsWithKerberos && taskManagerRunsWithKerberos,
Matchers.is(true));
+
+ final List<String> amRMTokens = Lists.newArrayList(AMRMTokenIdentifier.KIND_NAME.toString());
+ final String jobmanagerContainerId = getContainerIdByLogName("jobmanager.log");
+ final String taskmanagerContainerId = getContainerIdByLogName("taskmanager.log");
+ final boolean jobmanagerWithAmRmToken = verifyTokenKindInContainerCredentials(amRMTokens, jobmanagerContainerId);
+ final boolean taskmanagerWithAmRmToken = verifyTokenKindInContainerCredentials(amRMTokens, taskmanagerContainerId);
+
+ Assert.assertThat(
+ "The JobManager should have AMRMToken.",
+ jobmanagerWithAmRmToken,
+ Matchers.is(true));
+ Assert.assertThat(
+ "The TaskManager should not have AMRMToken.",
+ taskmanagerWithAmRmToken,
+ Matchers.is(false));
}
/* For secure cluster testing, it is enough to run only one test and override below test methods
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3763f65..cf18be9 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -32,6 +32,9 @@ import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -70,6 +73,7 @@ import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -442,9 +446,8 @@ public abstract class YarnTestBase extends TestLogger {
}
File f = new File(dir.getAbsolutePath() + "/" + name);
LOG.info("Searching in {}", f.getAbsolutePath());
- try {
+ try (Scanner scanner = new Scanner(f)) {
Set<String> foundSet = new HashSet<>(mustHave.length);
- Scanner scanner = new Scanner(f);
while (scanner.hasNextLine()) {
final String lineFromFile = scanner.nextLine();
for (String str : mustHave) {
@@ -471,6 +474,53 @@ public abstract class YarnTestBase extends TestLogger {
}
}
+ public static boolean verifyTokenKindInContainerCredentials(final Collection<String> tokens, final String containerId)
+ throws IOException {
+ File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+ if (!cwd.exists() || !cwd.isDirectory()) {
+ return false;
+ }
+
+ File containerTokens = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.equals(containerId + ".tokens");
+ }
+ });
+
+ if (containerTokens != null) {
+ LOG.info("Verifying tokens in {}", containerTokens.getAbsolutePath());
+
+ Credentials tmCredentials = Credentials.readTokenStorageFile(containerTokens, new Configuration());
+
+ Collection<Token<? extends TokenIdentifier>> userTokens = tmCredentials.getAllTokens();
+ Set<String> tokenKinds = new HashSet<>(4);
+ for (Token<? extends TokenIdentifier> token : userTokens) {
+ tokenKinds.add(token.getKind().toString());
+ }
+
+ return tokenKinds.containsAll(tokens);
+ } else {
+ LOG.warn("Unable to find credential file for container {}", containerId);
+ return false;
+ }
+ }
+
+ public static String getContainerIdByLogName(String logName) {
+ File cwd = new File("target/" + YARN_CONFIGURATION.get(TEST_CLUSTER_NAME_KEY));
+ File containerLog = findFile(cwd.getAbsolutePath(), new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.equals(logName);
+ }
+ });
+ if (containerLog != null) {
+ return containerLog.getParentFile().getName();
+ } else {
+ throw new IllegalStateException("No container has log named " + logName);
+ }
+ }
+
public static void sleep(int time) {
try {
Thread.sleep(time);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
index 20e02e1..9dee8ab 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
@@ -565,7 +566,17 @@ public final class Utils {
new File(fileLocation),
HadoopUtils.getHadoopConfiguration(flinkConfig));
- cred.writeTokenStorageToStream(dob);
+ // Filter out AMRMToken before setting the tokens to the TaskManager container context.
+ Credentials taskManagerCred = new Credentials();
+ Collection<Token<? extends TokenIdentifier>> userTokens = cred.getAllTokens();
+ for (Token<? extends TokenIdentifier> token : userTokens) {
+ if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ final Text id = new Text(token.getIdentifier());
+ taskManagerCred.addToken(id, token);
+ }
+ }
+
+ taskManagerCred.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
ctx.setTokens(securityTokens);
} catch (Throwable t) {