You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2016/03/16 21:58:16 UTC
[25/50] incubator-slider git commit: SLIDER-1077: improve
oozie/credential support. This isn't quite right yet
SLIDER-1077: improve oozie/credential support. This isn't quite right yet
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5dfac853
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5dfac853
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5dfac853
Branch: refs/heads/feature/SLIDER-906_docker_support
Commit: 5dfac853256ef4a3cf8f0a5fec003cdc4848f7f9
Parents: 8d4da2d
Author: Steve Loughran <st...@apache.org>
Authored: Fri Jan 29 20:06:00 2016 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Fri Jan 29 20:06:00 2016 +0000
----------------------------------------------------------------------
.../org/apache/slider/client/SliderClient.java | 21 +-
.../slider/client/SliderYarnClientImpl.java | 2 +-
.../slider/core/launch/AbstractLauncher.java | 53 ++--
.../slider/core/launch/AppMasterLauncher.java | 68 +++--
.../slider/core/launch/ContainerLauncher.java | 8 +
.../slider/core/launch/CredentialUtils.java | 251 +++++++++++++++++++
.../server/appmaster/RoleLaunchService.java | 21 +-
.../server/appmaster/SliderAppMaster.java | 101 ++++----
.../core/launch/TestAppMasterLauncher.java | 6 +-
.../TestAppMasterLauncherWithAmReset.java | 4 +-
10 files changed, 415 insertions(+), 120 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index aa19a3b..5f694e2 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.registry.client.types.Endpoint;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.KerberosDiags;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.alias.CredentialProvider;
@@ -131,6 +132,7 @@ import org.apache.slider.core.exceptions.UsageException;
import org.apache.slider.core.exceptions.WaitTimeoutException;
import org.apache.slider.core.launch.AppMasterLauncher;
import org.apache.slider.core.launch.ClasspathConstructor;
+import org.apache.slider.core.launch.CredentialUtils;
import org.apache.slider.core.launch.JavaCommandLineBuilder;
import org.apache.slider.core.launch.LaunchedApplication;
import org.apache.slider.core.launch.RunningApplication;
@@ -1909,6 +1911,22 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
// add the tags if available
Set<String> applicationTags = provider.getApplicationTags(sliderFileSystem,
getApplicationDefinitionPath(appOperations));
+
+ Credentials credentials = null;
+ if (clusterSecure) {
+ // pick up oozie credentials
+ credentials = CredentialUtils.loadFromEnvironment(
+ System.getenv(), config);
+ if (credentials == null) {
+ // nothing from oozie, so build up directly
+ credentials = new Credentials(
+ UserGroupInformation.getCurrentUser().getCredentials());
+ CredentialUtils.addRMRenewableFSDelegationTokens(config,
+ sliderFileSystem.getFileSystem(),
+ credentials);
+ }
+ }
+
AppMasterLauncher amLauncher = new AppMasterLauncher(clustername,
SliderKeys.APP_TYPE,
config,
@@ -1917,7 +1935,8 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
clusterSecure,
sliderAMResourceComponent,
resourceGlobalOptions,
- applicationTags);
+ applicationTags,
+ credentials);
ApplicationId appId = amLauncher.getApplicationId();
// set the application name;
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
index 85a582b..d471cdb 100644
--- a/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
+++ b/slider-core/src/main/java/org/apache/slider/client/SliderYarnClientImpl.java
@@ -190,7 +190,7 @@ public class SliderYarnClientImpl extends YarnClientImpl {
}
/**
- * Force kill a yarn application by ID. No niceities here
+ * Force kill a yarn application by ID. No niceties here
* @param applicationId app Id. "all" means "kill all instances of the current user
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
index 22bf328..f92ffb1 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AbstractLauncher.java
@@ -24,7 +24,6 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -68,25 +67,39 @@ public abstract class AbstractLauncher extends Configured {
* Env vars; set up at final launch stage
*/
protected final Map<String, String> envVars = new HashMap<>();
+
protected final MapOperations env = new MapOperations("env", envVars);
protected final ContainerLaunchContext containerLaunchContext =
Records.newRecord(ContainerLaunchContext.class);
protected final List<String> commands = new ArrayList<>(20);
protected final Map<String, LocalResource> localResources = new HashMap<>();
private final Map<String, ByteBuffer> serviceData = new HashMap<>();
+
// security
- protected final Credentials credentials = new Credentials();
+ protected final Credentials credentials;
protected LogAggregationContext logAggregationContext;
+ /**
+ * Create instance
+ * @param conf configuration
+ * @param coreFileSystem filesystem
+ * @param credentials initial set of credentials -null is permitted
+ */
+ protected AbstractLauncher(Configuration conf,
+ CoreFileSystem coreFileSystem,
+ Credentials credentials) {
+ super(conf);
+ this.coreFileSystem = coreFileSystem;
+ this.credentials = credentials != null ? credentials: new Credentials();
+ }
protected AbstractLauncher(Configuration conf,
CoreFileSystem fs) {
- super(conf);
- this.coreFileSystem = fs;
+ this(conf, fs, null);
}
protected AbstractLauncher(CoreFileSystem fs) {
- this.coreFileSystem = fs;
+ this(null, fs, null);
}
/**
@@ -133,7 +146,6 @@ public abstract class AbstractLauncher extends Configured {
localResources.putAll(resourceMap);
}
-
public Map<String, ByteBuffer> getServiceData() {
return serviceData;
}
@@ -169,7 +181,7 @@ public abstract class AbstractLauncher extends Configured {
/**
* Get all commands as a string, separated by ";". This is for diagnostics
- * @return a string descriptionof the commands
+ * @return a string description of the commands
*/
public String getCommandsAsString() {
return SliderUtils.join(getCommands(), "; ");
@@ -194,7 +206,7 @@ public abstract class AbstractLauncher extends Configured {
}
}
containerLaunchContext.setEnvironment(env);
-
+
//service data
if (log.isDebugEnabled()) {
log.debug("Service Data size");
@@ -211,21 +223,8 @@ public abstract class AbstractLauncher extends Configured {
//tokens
log.debug("{} tokens", credentials.numberOfTokens());
- DataOutputBuffer dob = new DataOutputBuffer();
- String tokenFileName =
- this.getConf().get(MAPREDUCE_JOB_CREDENTIALS_BINARY);
- if (tokenFileName != null) {
- // use delegation tokens, i.e. from Oozie
- Credentials creds =
- Credentials.readTokenStorageFile(new File(tokenFileName), getConf());
- creds.writeTokenStorageToStream(dob);
- } else {
- // normal auth
- credentials.writeTokenStorageToStream(dob);
- }
-
- ByteBuffer tokenBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- containerLaunchContext.setTokens(tokenBuffer);
+ containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
+ credentials));
return containerLaunchContext;
}
@@ -282,7 +281,7 @@ public abstract class AbstractLauncher extends Configured {
/**
* Extract the value for option
- * yarn.resourcemanager.am.retry-count-window-ms
+ * {@code yarn.resourcemanager.am.retry-count-window-ms}
* and set it on the ApplicationSubmissionContext. Use the default value
* if option is not set.
*
@@ -433,7 +432,7 @@ public abstract class AbstractLauncher extends Configured {
public String[] dumpEnvToString() {
- List<String> nodeEnv = new ArrayList<String>();
+ List<String> nodeEnv = new ArrayList<>();
for (Map.Entry<String, String> entry : env.entrySet()) {
String envElt = String.format("%s=\"%s\"",
@@ -453,8 +452,8 @@ public abstract class AbstractLauncher extends Configured {
* @param destRelativeDir relative path under destination local dir
* @throws IOException IO problems
*/
- public void submitDirectory(Path srcDir, String destRelativeDir) throws
- IOException {
+ public void submitDirectory(Path srcDir, String destRelativeDir)
+ throws IOException {
//add the configuration resources
Map<String, LocalResource> confResources;
confResources = coreFileSystem.submitDirectory(
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
index c82affa..091b80e 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/AppMasterLauncher.java
@@ -20,7 +20,7 @@ package org.apache.slider.core.launch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -34,12 +34,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Records;
import org.apache.slider.client.SliderYarnClientImpl;
import org.apache.slider.common.tools.CoreFileSystem;
-import org.apache.slider.common.tools.SliderUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.InetAddress;
import java.text.DateFormat;
import java.util.Date;
import java.util.Map;
@@ -76,20 +74,21 @@ public class AppMasterLauncher extends AbstractLauncher {
* @param options map of options. All values are extracted in this constructor only
* @param resourceGlobalOptions global options
* @param applicationTags any app tags
+ * @param credentials initial set of credentials
* @throws IOException
* @throws YarnException
*/
public AppMasterLauncher(String name,
- String type,
- Configuration conf,
- CoreFileSystem fs,
- SliderYarnClientImpl yarnClient,
- boolean secureCluster,
- Map<String, String> options,
- Map<String, String> resourceGlobalOptions,
- Set<String> applicationTags
- ) throws IOException, YarnException {
- super(conf, fs);
+ String type,
+ Configuration conf,
+ CoreFileSystem fs,
+ SliderYarnClientImpl yarnClient,
+ boolean secureCluster,
+ Map<String, String> options,
+ Map<String, String> resourceGlobalOptions,
+ Set<String> applicationTags,
+ Credentials credentials) throws IOException, YarnException {
+ super(conf, fs, credentials);
this.yarnClient = yarnClient;
this.application = yarnClient.createApplication();
this.name = name;
@@ -165,10 +164,8 @@ public class AppMasterLauncher extends AbstractLauncher {
* Complete the launch context (copy in env vars, etc).
* @return the container to launch
*/
- public ApplicationSubmissionContext completeAppMasterLaunch() throws
- IOException {
-
-
+ public ApplicationSubmissionContext completeAppMasterLaunch()
+ throws IOException {
//queue priority
Priority pri = Records.newRecord(Priority.class);
@@ -196,6 +193,7 @@ public class AppMasterLauncher extends AbstractLauncher {
}
if (secureCluster) {
+ //tokens
addSecurityTokens();
} else {
propagateUsernameInInsecureCluster();
@@ -211,42 +209,40 @@ public class AppMasterLauncher extends AbstractLauncher {
*/
private void addSecurityTokens() throws IOException {
- String tokenRenewer = SecurityUtil.getServerPrincipal(
- getConf().get(YarnConfiguration.RM_PRINCIPAL),
- InetAddress.getLocalHost().getCanonicalHostName());
- if (SliderUtils.isUnset(tokenRenewer)) {
- throw new IOException(
- "Can't get Master Kerberos principal for the RM to use as renewer: "
- + YarnConfiguration.RM_PRINCIPAL
- );
- }
+ CredentialUtils.addRMRenewableFSDelegationTokens(getConf(),
+ coreFileSystem.getFileSystem(), credentials);
+
+ String tokenRenewer = CredentialUtils.getRMPrincipal(getConf());
Token<? extends TokenIdentifier>[] tokens = null;
- boolean tokensProvided = getConf().get(MAPREDUCE_JOB_CREDENTIALS_BINARY) != null;
+ boolean tokensProvided = getConf().get(MAPREDUCE_JOB_CREDENTIALS_BINARY) !=
+ null;
if (!tokensProvided) {
- // For now, only getting tokens for the default file-system.
- FileSystem fs = coreFileSystem.getFileSystem();
- tokens = fs.addDelegationTokens(tokenRenewer, credentials);
+ // For now, only getting tokens for the default file-system.
+ FileSystem fs = coreFileSystem.getFileSystem();
+ tokens = fs.addDelegationTokens(tokenRenewer, credentials);
}
// obtain the token expiry from the first token - should be the same for all
// HDFS tokens
if (tokens != null && tokens.length > 0) {
AbstractDelegationTokenIdentifier id =
- (AbstractDelegationTokenIdentifier)tokens[0].decodeIdentifier();
+ (AbstractDelegationTokenIdentifier) tokens[0].decodeIdentifier();
Date d = new Date(id.getIssueDate() + 24 * 60 * 60 * 1000);
- log.info("HDFS delegation tokens for AM launch context require renewal by {}",
- DateFormat.getDateTimeInstance().format(d));
+ log.info(
+ "HDFS delegation tokens for AM launch context require renewal by {}",
+ DateFormat.getDateTimeInstance().format(d));
} else {
if (!tokensProvided) {
log.warn("No HDFS delegation tokens obtained for AM launch context");
} else {
- log.info("Tokens provided via "+ MAPREDUCE_JOB_CREDENTIALS_BINARY +" property "
- + "being used for AM launch");
+ log.info("Tokens provided via " + MAPREDUCE_JOB_CREDENTIALS_BINARY +
+ " property "
+ + "being used for AM launch");
}
}
- }
+ }
/**
* Submit the application.
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java b/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java
index 69b937d..e586743 100644
--- a/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/ContainerLauncher.java
@@ -20,6 +20,7 @@ package org.apache.slider.core.launch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.Container;
@@ -41,6 +42,13 @@ public class ContainerLauncher extends AbstractLauncher {
public final Container container;
public ContainerLauncher(Configuration conf,
+ CoreFileSystem coreFileSystem,
+ Container container, Credentials credentials) {
+ super(conf, coreFileSystem, credentials);
+ this.container = container;
+ }
+
+ public ContainerLauncher(Configuration conf,
CoreFileSystem fs,
Container container) {
super(conf, fs);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java b/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java
new file mode 100644
index 0000000..32068e2
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/core/launch/CredentialUtils.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.launch;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.DateFormat;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
+
+/**
+ * Utils to work with credentials and tokens.
+ *
+ * Designed to be movable to Hadoop core
+ */
+public final class CredentialUtils {
+
+ private CredentialUtils() {
+ }
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CredentialUtils.class);
+
+ /**
+ * Save credentials to a byte buffer. Returns null if there were no
+ * credentials to save
+ * @param credentials credential set
+ * @return a byte buffer of serialized tokens
+ * @throws IOException if the credentials could not be written to the stream
+ */
+ public static ByteBuffer marshallCredentials(Credentials credentials) throws IOException {
+ ByteBuffer buffer = null;
+ if (!credentials.getAllTokens().isEmpty()) {
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ dob.close();
+ buffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+ return buffer;
+ }
+
+ /**
+ * Load the credentials from the environment. This looks at
+ * the value of {@link UserGroupInformation#HADOOP_TOKEN_FILE_LOCATION}
+ * and attempts to read in the value
+ * @param env environment to resolve the variable from
+ * @param conf configuration use when reading the tokens
+ * @return a set of credentials, or null if the environment did not
+ * specify any
+ * @throws IOException if a location for credentials was defined, but
+ * the credentials could not be loaded.
+ */
+ public static Credentials loadFromEnvironment(Map<String, String> env,
+ Configuration conf)
+ throws IOException {
+ String tokenFilename = env.get(HADOOP_TOKEN_FILE_LOCATION);
+ if (tokenFilename != null) {
+ // use delegation tokens, i.e. from Oozie
+ File file = new File(tokenFilename.trim());
+ String details = String.format("Token File %s from environment variable %s",
+ file,
+ HADOOP_TOKEN_FILE_LOCATION);
+ LOG.debug("Using {}", details);
+ if (!file.exists()) {
+ throw new FileNotFoundException("No " + details);
+ }
+ if (!file.isFile() && !file.canRead()) {
+ throw new IOException("Cannot read " + details);
+ }
+ Credentials creds = Credentials.readTokenStorageFile(file, conf);
+ return creds;
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Look up and return the resource manager's principal. This method
+ * automatically does the <code>_HOST</code> replacement in the principal and
+ * correctly handles HA resource manager configurations.
+ *
+ * From: YARN-4629
+ * @param conf the {@link Configuration} file from which to read the
+ * principal
+ * @return the resource manager's principal string
+ * @throws IOException thrown if there's an error replacing the host name
+ */
+ public static String getRMPrincipal(Configuration conf) throws IOException {
+ String principal = conf.get(RM_PRINCIPAL, "");
+ String hostname;
+ Preconditions.checkState(!principal.isEmpty(), "Not set: " + RM_PRINCIPAL);
+
+ if (HAUtil.isHAEnabled(conf)) {
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ if (yarnConf.get(RM_HA_ID) == null) {
+ // If RM_HA_ID is not configured, use the first of RM_HA_IDS.
+ // Any valid RM HA ID should work.
+ String[] rmIds = yarnConf.getStrings(RM_HA_IDS);
+ Preconditions.checkState((rmIds != null) && (rmIds.length > 0),
+ "Not set " + RM_HA_IDS);
+ yarnConf.set(RM_HA_ID, rmIds[0]);
+ }
+
+ hostname = yarnConf.getSocketAddr(
+ RM_ADDRESS,
+ DEFAULT_RM_ADDRESS,
+ DEFAULT_RM_PORT).getHostName();
+ } else {
+ hostname = conf.getSocketAddr(
+ RM_ADDRESS,
+ DEFAULT_RM_ADDRESS,
+ DEFAULT_RM_PORT).getHostName();
+ }
+ return SecurityUtil.getServerPrincipal(principal, hostname);
+ }
+
+ /**
+ * Create and add any filesystem delegation tokens with
+ * the RM(s) configured to be able to renew them. Returns null
+ * on an insecure cluster (i.e. harmless)
+ * @param conf configuration
+ * @param fs filesystem
+ * @param credentials credentials to update
+ * @return a list of all added tokens.
+ * @throws IOException
+ */
+ public static Token<?>[] addRMRenewableFSDelegationTokens(Configuration conf,
+ FileSystem fs,
+ Credentials credentials) throws IOException {
+ Preconditions.checkArgument(conf != null);
+ Preconditions.checkArgument(credentials != null);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ String tokenRenewer = CredentialUtils.getRMPrincipal(conf);
+ Token<? extends TokenIdentifier>[] tokens = null;
+ return fs.addDelegationTokens(tokenRenewer, credentials);
+ }
+ return null;
+ }
+
+ /**
+ * Add an FS delegation token which can be renewed by the current user
+ * @param fs filesystem
+ * @param credentials credentials to update
+ * @throws IOException problems.
+ */
+ public static void addSelfRenewableFSDelegationTokens(
+ FileSystem fs,
+ Credentials credentials) throws IOException {
+ Preconditions.checkArgument(fs != null);
+ Preconditions.checkArgument(credentials != null);
+ fs.addDelegationTokens(
+ UserGroupInformation.getLoginUser().getShortUserName(),
+ credentials);
+ }
+
+ /**
+ * Filter a list of tokens from a set of credentials
+ * @param credentials credential source (a new credential set os re
+ * @param filter List of tokens to strip out
+ * @return a new, filtered, set of credentials
+ */
+ public static Credentials filterTokens(Credentials credentials,
+ List<Text> filter) {
+ Credentials result = new Credentials(credentials);
+ Iterator<Token<? extends TokenIdentifier>> iter =
+ result.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<? extends TokenIdentifier> token = iter.next();
+ LOG.debug("Token {}", token.getKind());
+ if (filter.contains(token.getKind())) {
+ LOG.debug("Filtering token {}", token.getKind());
+ iter.remove();
+ }
+ }
+ return result;
+ }
+
+ public static String dumpTokens(Credentials credentials, String separator) {
+ Collection<Token<? extends TokenIdentifier>> allTokens
+ = credentials.getAllTokens();
+ StringBuilder buffer = new StringBuilder(allTokens.size()* 128);
+ DateFormat df = DateFormat.getDateTimeInstance(
+ DateFormat.SHORT, DateFormat.SHORT);
+ for (Token<? extends TokenIdentifier> token : allTokens) {
+ buffer.append(toString(token)).append(separator);
+ }
+ return buffer.toString();
+ }
+
+ public static String toString(Token<? extends TokenIdentifier> token) {
+ DateFormat df = DateFormat.getDateTimeInstance(
+ DateFormat.SHORT, DateFormat.SHORT);
+ StringBuilder buffer = new StringBuilder(128);
+ buffer.append(token.toString());
+ try {
+ TokenIdentifier ti = token.decodeIdentifier();
+ buffer.append("; ").append(ti);
+ if (ti instanceof AbstractDelegationTokenIdentifier) {
+ AbstractDelegationTokenIdentifier dt
+ = (AbstractDelegationTokenIdentifier) ti;
+ buffer.append(" Issued: ")
+ .append(df.format(new Date(dt.getIssueDate())));
+ buffer.append(" Max Date: ")
+ .append(df.format(new Date(dt.getMaxDate())));
+ }
+ } catch (IOException e) {
+ LOG.debug("Failed to decode {}: {}", token, e, e);
+ }
+ return buffer.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
index d4f2fd5..7515c1a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
@@ -21,6 +21,7 @@ package org.apache.slider.server.appmaster;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.slider.common.SliderKeys;
@@ -63,7 +64,7 @@ public class RoleLaunchService
private final QueueAccess actionQueue;
/**
- * Provider bulding up the command
+ * Provider building up the command
*/
private final ProviderService provider;
@@ -122,23 +123,26 @@ public class RoleLaunchService
* @param container container target
* @param role role
* @param clusterSpec cluster spec to use for template
+ * @param credentials credentials to use
*/
public void launchRole(ContainerAssignment assignment,
- AggregateConf clusterSpec) {
+ AggregateConf clusterSpec,
+ Credentials credentials) {
RoleStatus role = assignment.role;
String roleName = role.getName();
// prelaunch safety check
Preconditions.checkArgument(provider.isSupportedRole(roleName));
RoleLaunchService.RoleLauncher launcher =
new RoleLaunchService.RoleLauncher(assignment,
- clusterSpec,
+ clusterSpec,
clusterSpec.getResourceOperations().getOrAddComponent(roleName),
- clusterSpec.getAppConfOperations().getOrAddComponent(roleName));
+ clusterSpec.getAppConfOperations().getOrAddComponent(roleName),
+ credentials);
execute(launcher);
}
/**
- * Thread that runs on the AM to launch a region server.
+ * Thread that runs on the AM to launch a container
*/
private class RoleLauncher implements Runnable {
@@ -150,13 +154,16 @@ public class RoleLaunchService
private final MapOperations appComponent;
private final AggregateConf instanceDefinition;
public final ProviderRole role;
+ private final Credentials credentials;
private Exception raisedException;
public RoleLauncher(ContainerAssignment assignment,
AggregateConf instanceDefinition,
MapOperations resourceComponent,
- MapOperations appComponent) {
+ MapOperations appComponent,
+ Credentials credentials) {
this.assignment = assignment;
+ this.credentials = credentials;
this.container = assignment.container;
RoleStatus roleStatus = assignment.role;
@@ -187,7 +194,7 @@ public class RoleLaunchService
public void run() {
try {
ContainerLauncher containerLauncher =
- new ContainerLauncher(getConfig(), fs, container);
+ new ContainerLauncher(getConfig(), fs, container, credentials);
containerLauncher.setupUGI();
containerLauncher.putEnv(envVars);
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 729d46e..82c9fb9 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -20,9 +20,6 @@ package org.apache.slider.server.appmaster;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.health.HealthCheckRegistry;
-import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
-import com.codahale.metrics.jvm.MemoryUsageGaugeSet;
-import com.codahale.metrics.jvm.ThreadStatesGaugeSet;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
@@ -35,7 +32,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
@@ -110,6 +106,7 @@ import org.apache.slider.core.exceptions.BadConfigException;
import org.apache.slider.core.exceptions.SliderException;
import org.apache.slider.core.exceptions.SliderInternalStateException;
import org.apache.slider.core.exceptions.TriggerClusterTeardownException;
+import org.apache.slider.core.launch.CredentialUtils;
import org.apache.slider.core.main.ExitCodeProvider;
import org.apache.slider.core.main.LauncherExitCodes;
import org.apache.slider.core.main.RunService;
@@ -257,7 +254,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
public NMClientAsync nmClientAsync;
/**
- * token blob
+ * Credentials for propagating down to launched containers
*/
private Credentials containerCredentials;
@@ -698,7 +695,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
/*
* Extract the container ID. This is then
- * turned into an (incompete) container
+ * turned into an (incomplete) container
*/
appMasterContainerID = ConverterUtils.toContainerId(
SliderUtils.mandatoryEnvVariable(ApplicationConstants.Environment.CONTAINER_ID.name()));
@@ -1117,35 +1114,29 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
}
}
- private void processAMCredentials(SecurityConfiguration securityConfiguration)
+ /**
+ * Process the initial user to obtain the set of user
+ * supplied credentials (tokens were passed in by client).
+ * Removes the AM/RM token.
+ * If a keytab has been provided, also strip the HDFS delegation token.
+ * @param securityConfig slider security config
+ * @throws IOException
+ */
+ private void processAMCredentials(SecurityConfiguration securityConfig)
throws IOException {
- // process the initial user to obtain the set of user
- // supplied credentials (tokens were passed in by client). Remove AMRM
- // token and HDFS delegation token, the latter because we will provide an
- // up to date token for container launches (getContainerCredentials()).
- UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
- Credentials credentials = currentUser.getCredentials();
- List<Text> filteredTokens = new ArrayList<>();
+
+ List<Text> filteredTokens = new ArrayList<>(2);
filteredTokens.add(AMRMTokenIdentifier.KIND_NAME);
- boolean keytabProvided = securityConfiguration.isKeytabProvided();
+ boolean keytabProvided = securityConfig.isKeytabProvided();
log.info("Slider AM Security Mode: {}", keytabProvided ? "KEYTAB" : "TOKEN");
if (keytabProvided) {
filteredTokens.add(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
}
- Iterator<Token<? extends TokenIdentifier>> iter =
- credentials.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<? extends TokenIdentifier> token = iter.next();
- log.info("Token {}", token.getKind());
- if (filteredTokens.contains(token.getKind())) {
- log.debug("Filtering token {} from AM tokens", token.getKind());
- iter.remove();
- }
- }
- // at this point this credentials map is probably clear, but leaving this
- // code to allow for future tokens...
- containerCredentials = credentials;
+ containerCredentials = CredentialUtils.filterTokens(
+ UserGroupInformation.getCurrentUser().getCredentials(),
+ filteredTokens);
+ log.info(CredentialUtils.dumpTokens(containerCredentials, "\n"));
}
/**
@@ -1698,9 +1689,18 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
//for each assignment: instantiate that role
for (ContainerAssignment assignment : assignments) {
- launchService.launchRole(assignment, getInstanceDefinition());
+ try {
+ launchService.launchRole(assignment, getInstanceDefinition(),
+ buildContainerCredentials());
+ } catch (IOException e) {
+ // Can be caused by failure to renew credentials with the remote
+ // service. If so, don't launch the application. Container is retained,
+ // though YARN will take it away after a timeout.
+ log.error("Failed to build credentials to launch container: {}", e, e);
+
+ }
}
-
+
//for all the operations, exec them
execute(operations);
log.info("Diagnostics: {}", getContainerDiagnosticInfo());
@@ -2211,34 +2211,49 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
// add current HDFS delegation token with an up to date token
ByteBuffer tokens = getContainerCredentials();
+ /*
+ ByteBuffer tokens = getContainerCredentials();
+
if (tokens != null) {
ctx.setTokens(tokens);
} else {
log.warn("No delegation tokens obtained and set for launch context");
}
+*/
appState.containerStartSubmitted(container, instance);
nmClientAsync.startContainerAsync(container, ctx);
}
+ /**
+ * Build the credentials needed for containers. This will include
+ * getting new delegation tokens for HDFS if the AM is running
+ * with a keytab.
+ * @return a buffer of credentials
+ * @throws IOException
+ */
private ByteBuffer getContainerCredentials() throws IOException {
// a delegation token can be retrieved from filesystem since
// the login is via a keytab (see above)
+ Credentials credentials = buildContainerCredentials();
+ return CredentialUtils.marshallCredentials(credentials);
+ }
+
+ /**
+ * Build the credentials needed for containers. This will include
+ * getting new delegation tokens for HDFS if the AM is running
+ * with a keytab.
+ * @return a buffer of credentials
+ * @throws IOException
+ */
+
+ private Credentials buildContainerCredentials() throws IOException {
Credentials credentials = new Credentials(containerCredentials);
- ByteBuffer tokens = null;
if (securityConfiguration.isKeytabProvided()) {
- Token<? extends TokenIdentifier>[] hdfsTokens =
- getClusterFS().getFileSystem().addDelegationTokens(
- UserGroupInformation.getLoginUser().getShortUserName(),
- credentials);
+ CredentialUtils.addSelfRenewableFSDelegationTokens(
+ getClusterFS().getFileSystem(),
+ credentials);
}
- if (!credentials.getAllTokens().isEmpty()) {
- DataOutputBuffer dob = new DataOutputBuffer();
- credentials.writeTokenStorageToStream(dob);
- dob.close();
- tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- }
-
- return tokens;
+ return credentials;
}
@Override // NMClientAsync.CallbackHandler
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java
index 60af770..eae9658 100644
--- a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java
+++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncher.java
@@ -88,7 +88,7 @@ public class TestAppMasterLauncher {
EasyMock.replay(mockYarnClient, appSubmissionContext, yarnClientApp);
appMasterLauncher = new AppMasterLauncher("cl1", SliderKeys.APP_TYPE, null,
- null, mockYarnClient, false, null, options, tags);
+ null, mockYarnClient, false, null, options, tags, null);
// Verify the include/exclude patterns
String expectedInclude = "slider*.txt|agent.out";
@@ -109,7 +109,7 @@ public class TestAppMasterLauncher {
EasyMock.replay(mockYarnClient, appSubmissionContext, yarnClientApp);
appMasterLauncher = new AppMasterLauncher("cl1", SliderKeys.APP_TYPE, null,
- null, mockYarnClient, false, null, options, tags);
+ null, mockYarnClient, false, null, options, tags, null);
// Verify the include/exclude patterns
String expectedInclude = isOldApi ? "" : ".*";
@@ -128,7 +128,7 @@ public class TestAppMasterLauncher {
EasyMock.replay(mockYarnClient, appSubmissionContext, yarnClientApp);
appMasterLauncher = new AppMasterLauncher("cl1", SliderKeys.APP_TYPE, null,
- null, mockYarnClient, false, null, options, tags);
+ null, mockYarnClient, false, null, options, tags, null);
// Verify the include/exclude patterns
String expectedInclude = isOldApi ? "" : ".*";
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5dfac853/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
index cc64cab..a8f6b26 100644
--- a/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
+++ b/slider-core/src/test/java/org/apache/slider/core/launch/TestAppMasterLauncherWithAmReset.java
@@ -64,7 +64,7 @@ public class TestAppMasterLauncherWithAmReset {
EasyMock.replay(mockYarnClient, yarnClientApp);
appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
- null, mockYarnClient, false, null, options, tags);
+ null, mockYarnClient, false, null, options, tags, null);
ApplicationSubmissionContext ctx = appMasterLauncher.application
.getApplicationSubmissionContext();
@@ -80,7 +80,7 @@ public class TestAppMasterLauncherWithAmReset {
EasyMock.replay(mockYarnClient, yarnClientApp);
appMasterLauncher = new AppMasterLauncher("am1", SliderKeys.APP_TYPE, null,
- null, mockYarnClient, false, null, options, tags);
+ null, mockYarnClient, false, null, options, tags, null);
ApplicationSubmissionContext ctx = appMasterLauncher.application
.getApplicationSubmissionContext();