You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/12/07 00:54:18 UTC
[33/50] [abbrv] hadoop git commit: YARN-6669. Implemented Kerberos
security for YARN service framework. (Contributed by Jian He)
YARN-6669. Implemented Kerberos security for YARN service framework. (Contributed by Jian He)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d30d5782
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d30d5782
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d30d5782
Branch: refs/heads/HDFS-7240
Commit: d30d57828fddaa8667de49af879cde999907c7f6
Parents: 404eab4
Author: Eric Yang <ey...@apache.org>
Authored: Mon Dec 4 15:11:00 2017 -0500
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Dec 4 15:11:00 2017 -0500
----------------------------------------------------------------------
.../hadoop/yarn/service/webapp/ApiServer.java | 1 +
...RN-Simplified-V1-API-Layer-For-Services.yaml | 16 +
.../dev-support/findbugs-exclude.xml | 5 +-
.../yarn/service/ClientAMPolicyProvider.java | 39 ++
.../yarn/service/ClientAMSecurityInfo.java | 62 ++++
.../hadoop/yarn/service/ClientAMService.java | 9 +
.../hadoop/yarn/service/ServiceContext.java | 8 +
.../hadoop/yarn/service/ServiceMaster.java | 140 ++++++-
.../hadoop/yarn/service/ServiceScheduler.java | 30 +-
.../service/api/records/KerberosPrincipal.java | 146 ++++++++
.../yarn/service/api/records/Service.java | 23 ++
.../yarn/service/client/ServiceClient.java | 174 ++++++---
.../yarn/service/component/Component.java | 2 +-
.../yarn/service/conf/YarnServiceConf.java | 7 -
.../yarn/service/conf/YarnServiceConstants.java | 3 +
.../containerlaunch/AbstractLauncher.java | 39 +-
.../containerlaunch/ContainerLaunchService.java | 10 +-
.../containerlaunch/CredentialUtils.java | 319 ----------------
.../hadoop/yarn/service/package-info.java | 24 ++
.../yarn/service/provider/ProviderUtils.java | 53 +--
.../yarn/service/utils/ServiceApiUtil.java | 15 +
.../hadoop/yarn/service/utils/ServiceUtils.java | 31 +-
.../org.apache.hadoop.security.SecurityInfo | 14 +
.../client/api/RegistryOperationsFactory.java | 21 ++
.../registry/client/impl/zk/CuratorService.java | 8 +-
.../client/impl/zk/RegistrySecurity.java | 96 ++++-
.../hadoop/registry/server/dns/RegistryDNS.java | 4 +
.../RMRegistryOperationsService.java | 246 -------------
.../services/DeleteCompletionCallback.java | 3 +-
.../hadoop/registry/AbstractRegistryTest.java | 15 +-
.../integration/TestRegistryRMOperations.java | 369 -------------------
.../secure/TestSecureRMRegistryOperations.java | 348 -----------------
.../site/markdown/yarn-service/QuickStart.md | 20 +-
.../markdown/yarn-service/YarnServiceAPI.md | 11 +-
34 files changed, 844 insertions(+), 1467 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index 1bb6c93..34ab8f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -173,6 +173,7 @@ public class ApiServer {
return Response.status(Status.BAD_REQUEST).entity(serviceStatus)
.build();
} catch (Exception e) {
+ LOG.error("Fail to stop service:", e);
ServiceStatus serviceStatus = new ServiceStatus();
serviceStatus.setDiagnostics(e.getMessage());
return Response.status(Status.INTERNAL_SERVER_ERROR)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
index 088b50c..979883c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
@@ -244,6 +244,10 @@ definitions:
queue:
type: string
description: The YARN queue that this service should be submitted to.
+ kerberos_principal:
+ description: The Kerberos Principal of the service
+ $ref: '#/definitions/KerberosPrincipal'
+
Resource:
description:
Resource determines the amount of resources (vcores, memory, network, etc.) usable by a container. This field determines the resource to be applied for all the containers of a component or service. The resource specified at the service (or global) level can be overriden at the component level. Only one of profile OR cpu & memory are expected. It raises a validation exception otherwise.
@@ -469,3 +473,15 @@ definitions:
type: integer
format: int32
description: An error code specific to a scenario which service owners should be able to use to understand the failure in addition to the diagnostic information.
+ KerberosPrincipal:
+ description: The kerberos principal info of the user who launches the service.
+ properties:
+ principal_name:
+ type: string
+ description: The principal name of the user who launches the service.
+ keytab:
+ type: string
+ description: |
+ The URI of the kerberos keytab. It supports two modes:
+ URI starts with "hdfs://": A path on hdfs where the keytab is stored. The keytab will be localized by YARN to each host.
+ URI starts with "file://": A path on the local host where the keytab is stored. It is assumed that the keytabs are pre-installed by admins before AM launches.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
index 2814cca..80c04c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
@@ -44,5 +44,8 @@
<Field name="registryClient" />
<Bug pattern="IS2_INCONSISTENT_SYNC"/>
</Match>
-
+ <Match>
+ <Class name="org.apache.hadoop.yarn.service.ClientAMPolicyProvider"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
new file mode 100644
index 0000000..365df0f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * PolicyProvider for Client to Service AM protocol.
+ */
+public class ClientAMPolicyProvider extends PolicyProvider {
+
+ private static final Service[] CLIENT_AM_SERVICE =
+ new Service[]{
+ new Service(
+ "security.yarn-service.client-am-protocol.acl",
+ ClientAMProtocol.class)};
+
+ @Override
+ public Service[] getServices() {
+ return CLIENT_AM_SERVICE;
+ };
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
new file mode 100644
index 0000000..e19284b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * Security Info for Client to Service AM protocol.
+ */
+public class ClientAMSecurityInfo extends SecurityInfo{
+ @Override
+ public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+ if (!protocol.equals(ClientAMProtocolPB.class)) {
+ return null;
+ }
+ return new KerberosInfo() {
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public String serverPrincipal() {
+ return YarnServiceConstants.PRINCIPAL;
+ }
+
+ @Override
+ public String clientPrincipal() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 8e4c34d..94dd8d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.service.AbstractService;
@@ -65,6 +66,14 @@ public class ClientAMService extends AbstractService
InetSocketAddress address = new InetSocketAddress(0);
server = rpc.getServer(ClientAMProtocol.class, this, address, conf,
context.secretManager, 1);
+
+ // Enable service authorization?
+ if (conf.getBoolean(
+ CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+ false)) {
+ this.server.refreshServiceAcl(getConfig(), new ClientAMPolicyProvider());
+ }
+
server.start();
String nodeHostString =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
index 94dbc6e..cd41ab7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
+import java.nio.ByteBuffer;
+
public class ServiceContext {
public Service service = null;
public SliderFileSystem fs;
@@ -34,6 +36,12 @@ public class ServiceContext {
public ServiceScheduler scheduler;
public ClientToAMTokenSecretManager secretManager;
public ClientAMService clientAMService;
+ // tokens used for container launch
+ public ByteBuffer tokens;
+ // AM keytab principal
+ public String principal;
+ // AM keytab location
+ public String keytab;
public ServiceContext() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
index b0b4f06..1283604 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
@@ -20,33 +20,49 @@ package org.apache.hadoop.yarn.service;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
import org.apache.hadoop.yarn.service.monitor.ServiceMonitor;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
-import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
import java.util.Map;
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION;
+
public class ServiceMaster extends CompositeService {
private static final Logger LOG =
@@ -63,13 +79,7 @@ public class ServiceMaster extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
- //TODO Deprecate slider conf, make sure works with yarn conf
printSystemEnv();
- if (UserGroupInformation.isSecurityEnabled()) {
- UserGroupInformation.setConfiguration(conf);
- }
- LOG.info("Login user is {}", UserGroupInformation.getLoginUser());
-
context = new ServiceContext();
Path appDir = getAppDir();
context.serviceHdfsDir = appDir.toString();
@@ -78,6 +88,10 @@ public class ServiceMaster extends CompositeService {
fs.setAppDir(appDir);
loadApplicationJson(context, fs);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ context.tokens = recordTokensForContainers();
+ doSecureLogin();
+ }
// Take yarn config from YarnFile and merge them into YarnConfiguration
for (Map.Entry<String, String> entry : context.service
.getConfiguration().getProperties().entrySet()) {
@@ -111,6 +125,100 @@ public class ServiceMaster extends CompositeService {
super.serviceInit(conf);
}
+ // Record the tokens and use them for launching containers.
+ // e.g. localization requires the hdfs delegation tokens
+ private ByteBuffer recordTokensForContainers() throws IOException {
+ Credentials copy = new Credentials(UserGroupInformation.getCurrentUser()
+ .getCredentials());
+ DataOutputBuffer dob = new DataOutputBuffer();
+ try {
+ copy.writeTokenStorageToStream(dob);
+ } finally {
+ dob.close();
+ }
+ // Now remove the AM->RM token so that task containers cannot access it.
+ Iterator<Token<?>> iter = copy.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<?> token = iter.next();
+ LOG.info(token.toString());
+ if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+ iter.remove();
+ }
+ }
+ return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ }
+
+ // 1. First try to use user specified keytabs
+ // 2. If not specified, then try to use pre-installed keytab at localhost
+ // 3. strip off hdfs delegation tokens to ensure use keytab to talk to hdfs
+ private void doSecureLogin()
+ throws IOException, URISyntaxException {
+ // read the localized keytab specified by user
+ File keytab = new File(String.format(KEYTAB_LOCATION,
+ context.service.getName()));
+ if (!keytab.exists()) {
+ LOG.info("No keytab localized at " + keytab);
+ // Check if there exists a pre-installed keytab at host
+ String preInstalledKeytab = context.service.getKerberosPrincipal()
+ .getKeytab();
+ if (!StringUtils.isEmpty(preInstalledKeytab)) {
+ URI uri = new URI(preInstalledKeytab);
+ if (uri.getScheme().equals("file")) {
+ keytab = new File(uri);
+ LOG.info("Using pre-installed keytab from localhost: " +
+ preInstalledKeytab);
+ }
+ }
+ }
+ if (!keytab.exists()) {
+ LOG.info("No keytab exists: " + keytab);
+ return;
+ }
+ String principal = context.service.getKerberosPrincipal()
+ .getPrincipalName();
+ if (StringUtils.isEmpty((principal))) {
+ principal = UserGroupInformation.getLoginUser().getShortUserName();
+ LOG.info("No principal name specified. Will use AM " +
+ "login identity {} to attempt keytab-based login", principal);
+ }
+
+ Credentials credentials = UserGroupInformation.getCurrentUser()
+ .getCredentials();
+ LOG.info("User before logged in is: " + UserGroupInformation
+ .getCurrentUser());
+ String principalName = SecurityUtil.getServerPrincipal(principal,
+ ServiceUtils.getLocalHostName(getConfig()));
+ UserGroupInformation.loginUserFromKeytab(principalName,
+ keytab.getAbsolutePath());
+ // add back the credentials
+ UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ LOG.info("User after logged in is: " + UserGroupInformation
+ .getCurrentUser());
+ context.principal = principalName;
+ context.keytab = keytab.getAbsolutePath();
+ removeHdfsDelegationToken(UserGroupInformation.getLoginUser());
+ }
+
+ // Remove HDFS delegation token from login user and ensure AM to use keytab
+ // to talk to hdfs
+ private static void removeHdfsDelegationToken(UserGroupInformation user) {
+ if (!user.isFromKeytab()) {
+ LOG.error("AM is not holding on a keytab in a secure deployment:" +
+ " service will fail when tokens expire");
+ }
+ Credentials credentials = user.getCredentials();
+ Iterator<Token<? extends TokenIdentifier>> iter =
+ credentials.getAllTokens().iterator();
+ while (iter.hasNext()) {
+ Token<? extends TokenIdentifier> token = iter.next();
+ if (token.getKind().equals(
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
+ LOG.info("Remove HDFS delegation token {}.", token);
+ iter.remove();
+ }
+ }
+ }
+
protected ContainerId getAMContainerId() throws BadClusterStateException {
return ContainerId.fromString(ServiceUtils.mandatoryEnvVariable(
ApplicationConstants.Environment.CONTAINER_ID.name()));
@@ -133,6 +241,17 @@ public class ServiceMaster extends CompositeService {
}
@Override
+ protected void serviceStart() throws Exception {
+ LOG.info("Starting service as user " + UserGroupInformation
+ .getCurrentUser());
+ UserGroupInformation.getLoginUser().doAs(
+ (PrivilegedExceptionAction<Void>) () -> {
+ super.serviceStart();
+ return null;
+ }
+ );
+ }
+ @Override
protected void serviceStop() throws Exception {
LOG.info("Stopping app master");
super.serviceStop();
@@ -146,7 +265,8 @@ public class ServiceMaster extends CompositeService {
public static void main(String[] args) throws Exception {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
- StringUtils.startupShutdownMessage(ServiceMaster.class, args, LOG);
+ org.apache.hadoop.util.StringUtils
+ .startupShutdownMessage(ServiceMaster.class, args, LOG);
try {
ServiceMaster serviceMaster = new ServiceMaster("Service Master");
ShutdownHookManager.get()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 6bc5673..bea31cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -22,6 +22,7 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +35,7 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.registry.client.types.ServiceRecord;
import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.HadoopKerberosName;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -142,11 +144,29 @@ public class ServiceScheduler extends CompositeService {
}
public void buildInstance(ServiceContext context, Configuration configuration)
- throws YarnException {
+ throws YarnException, IOException {
app = context.service;
executorService = Executors.newScheduledThreadPool(10);
- RegistryOperations registryClient = RegistryOperationsFactory
- .createInstance("ServiceScheduler", configuration);
+ RegistryOperations registryClient = null;
+ if (UserGroupInformation.isSecurityEnabled() &&
+ !StringUtils.isEmpty(context.principal)
+ && !StringUtils.isEmpty(context.keytab)) {
+ Configuration conf = getConfig();
+ // Only take the first section of the principal
+ // e.g. hdfs-demo@EXAMPLE.COM will take hdfs-demo
+ // This is because somehow zookeeper client only uses the first section
+ // for acl validations.
+ String username = new HadoopKerberosName(context.principal.trim())
+ .getServiceName();
+ LOG.info("Set registry user accounts: sasl:" + username);
+ conf.set(KEY_REGISTRY_USER_ACCOUNTS, "sasl:" + username);
+ registryClient = RegistryOperationsFactory
+ .createKerberosInstance(conf,
+ "Client", context.principal, context.keytab);
+ } else {
+ registryClient = RegistryOperationsFactory
+ .createInstance("ServiceScheduler", configuration);
+ }
addIfService(registryClient);
yarnRegistryOperations =
createYarnRegistryOperations(context, registryClient);
@@ -171,7 +191,7 @@ public class ServiceScheduler extends CompositeService {
dispatcher.setDrainEventsOnStop();
addIfService(dispatcher);
- containerLaunchService = new ContainerLaunchService(context.fs);
+ containerLaunchService = new ContainerLaunchService(context);
addService(containerLaunchService);
if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
@@ -408,7 +428,7 @@ public class ServiceScheduler extends CompositeService {
}
} catch (IOException e) {
LOG.error(
- "Failed to register app " + app.getName() + " in registry");
+ "Failed to register app " + app.getName() + " in registry", e);
}
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
new file mode 100644
index 0000000..e38fdb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlElement;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The kerberos principal of the service.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "The kerberos principal of the service.")
+@javax.annotation.Generated(value = "io.swagger.codegen.languages" +
+ ".JavaClientCodegen", date = "2017-11-20T11:29:11.785-08:00")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class KerberosPrincipal implements Serializable {
+ private static final long serialVersionUID = -6431667195287650037L;
+
+ @JsonProperty("principal_name")
+ @XmlElement(name = "principal_name")
+ private String principalName = null;
+
+ @JsonProperty("keytab")
+ @XmlElement(name = "keytab")
+ private String keytab = null;
+
+ public KerberosPrincipal principalName(String principalName) {
+ this.principalName = principalName;
+ return this;
+ }
+
+ /**
+ * The principal name of the service.
+ *
+ * @return principalName
+ **/
+ @ApiModelProperty(value = "The principal name of the service.")
+ public String getPrincipalName() {
+ return principalName;
+ }
+
+ public void setPrincipalName(String principalName) {
+ this.principalName = principalName;
+ }
+
+ public KerberosPrincipal keytab(String keytab) {
+ this.keytab = keytab;
+ return this;
+ }
+
+ /**
+ * The URI of the kerberos keytab. It supports two schemes \"
+ * hdfs\" and \"file\". If the URI starts with \"
+ * hdfs://\" scheme, it indicates the path on hdfs where the keytab is
+ * stored. The keytab will be localized by YARN and made available to AM in
+ * its local directory. If the URI starts with \"file://\"
+ * scheme, it indicates a path on the local host presumbaly installed by
+ * admins upfront.
+ *
+ * @return keytab
+ **/
+ @ApiModelProperty(value = "The URI of the kerberos keytab. It supports two " +
+ "schemes \"hdfs\" and \"file\". If the URI starts with \"hdfs://\" " +
+ "scheme, it indicates the path on hdfs where the keytab is stored. The " +
+ "keytab will be localized by YARN and made available to AM in its local" +
+ " directory. If the URI starts with \"file://\" scheme, it indicates a " +
+ "path on the local host where the keytab is presumbaly installed by " +
+ "admins upfront. ")
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public void setKeytab(String keytab) {
+ this.keytab = keytab;
+ }
+
+
+ @Override
+ public boolean equals(java.lang.Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KerberosPrincipal kerberosPrincipal = (KerberosPrincipal) o;
+ return Objects.equals(this.principalName, kerberosPrincipal
+ .principalName) &&
+ Objects.equals(this.keytab, kerberosPrincipal.keytab);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(principalName, keytab);
+ }
+
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("class KerberosPrincipal {\n");
+
+ sb.append(" principalName: ").append(toIndentedString(principalName))
+ .append("\n");
+ sb.append(" keytab: ").append(toIndentedString(keytab)).append("\n");
+ sb.append("}");
+ return sb.toString();
+ }
+
+ /**
+ * Convert the given object to string with each line indented by 4 spaces
+ * (except the first line).
+ */
+ private String toIndentedString(java.lang.Object o) {
+ if (o == null) {
+ return "null";
+ }
+ return o.toString().replace("\n", "\n ");
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
index 8045822..392b71e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
@@ -71,6 +71,9 @@ public class Service extends BaseResource {
private ServiceState state = null;
private Map<String, String> quicklinks = new HashMap<>();
private String queue = null;
+ @JsonProperty("kerberos_principal")
+ @XmlElement(name = "kerberos_principal")
+ private KerberosPrincipal kerberosPrincipal = new KerberosPrincipal();
/**
* A unique service name.
@@ -335,6 +338,24 @@ public class Service extends BaseResource {
this.queue = queue;
}
+ public Service kerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
+ this.kerberosPrincipal = kerberosPrincipal;
+ return this;
+ }
+
+ /**
+ * The Kerberos Principal of the service.
+ * @return kerberosPrincipal
+ **/
+ @ApiModelProperty(value = "The Kerberos Principal of the service")
+ public KerberosPrincipal getKerberosPrincipal() {
+ return kerberosPrincipal;
+ }
+
+ public void setKerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
+ this.kerberosPrincipal = kerberosPrincipal;
+ }
+
@Override
public boolean equals(java.lang.Object o) {
if (this == o) {
@@ -376,6 +397,8 @@ public class Service extends BaseResource {
sb.append(" quicklinks: ").append(toIndentedString(quicklinks))
.append("\n");
sb.append(" queue: ").append(toIndentedString(queue)).append("\n");
+ sb.append(" kerberosPrincipal: ")
+ .append(toIndentedString(kerberosPrincipal)).append("\n");
sb.append("}");
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index d1b6026..81c56d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -28,12 +28,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -79,6 +83,9 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@@ -98,7 +105,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
//TODO disable retry so that client / rest API doesn't block?
protected YarnClient yarnClient;
// Avoid looking up applicationId from fs all the time.
- private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
+ private Map<String, AppInfo> cachedAppInfo = new ConcurrentHashMap<>();
private RegistryOperations registryClient;
private CuratorFramework curatorClient;
@@ -210,7 +217,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
// Write the definition first and then submit - AM will read the definition
createDirAndPersistApp(appDir, service);
ApplicationId appId = submitApp(service);
- cachedAppIds.put(serviceName, appId);
+ cachedAppInfo.put(serviceName, new AppInfo(appId, service
+ .getKerberosPrincipal().getPrincipalName()));
service.setId(appId.toString());
// update app definition with appId
persistAppDef(appDir, service);
@@ -224,8 +232,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
Service persistedService =
ServiceApiUtil.loadService(fs, serviceName);
if (!StringUtils.isEmpty(persistedService.getId())) {
- cachedAppIds.put(persistedService.getName(),
- ApplicationId.fromString(persistedService.getId()));
+ cachedAppInfo.put(persistedService.getName(), new AppInfo(
+ ApplicationId.fromString(persistedService.getId()),
+ persistedService.getKerberosPrincipal().getPrincipalName()));
} else {
throw new YarnException(persistedService.getName()
+ " appId is null, may be not submitted to YARN yet");
@@ -278,8 +287,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
throw new YarnException(
serviceName + " appId is null, may be not submitted to YARN yet");
}
- cachedAppIds.put(persistedService.getName(),
- ApplicationId.fromString(persistedService.getId()));
+ cachedAppInfo.put(persistedService.getName(), new AppInfo(
+ ApplicationId.fromString(persistedService.getId()), persistedService
+ .getKerberosPrincipal().getPrincipalName()));
return flexComponents(serviceName, componentCounts, persistedService);
}
@@ -328,7 +338,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
throw new YarnException(serviceName + " AM hostname is empty");
}
ClientAMProtocol proxy =
- createAMProxy(appReport.getHost(), appReport.getRpcPort());
+ createAMProxy(serviceName, appReport);
proxy.flexComponents(requestBuilder.build());
for (Map.Entry<String, Long> entry : original.entrySet()) {
LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
@@ -366,8 +376,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
LOG.info("Stopping service {}, with appId = {}", serviceName, currentAppId);
try {
ClientAMProtocol proxy =
- createAMProxy(report.getHost(), report.getRpcPort());
- cachedAppIds.remove(serviceName);
+ createAMProxy(serviceName, report);
+ cachedAppInfo.remove(serviceName);
if (proxy != null) {
// try to stop the app gracefully.
StopRequestProto request = StopRequestProto.newBuilder().build();
@@ -406,8 +416,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
}
} catch (IOException | YarnException | InterruptedException e) {
- LOG.info("Failed to stop " + serviceName
- + " gracefully, forcefully kill the app.");
+ LOG.info("Failed to stop " + serviceName + " gracefully due to: "
+ + e.getMessage() + ", forcefully kill the app.");
yarnClient.killApplication(currentAppId, "Forcefully kill the app");
}
return EXIT_SUCCESS;
@@ -421,7 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
Path appDir = fs.buildClusterDirPath(serviceName);
FileSystem fileSystem = fs.getFileSystem();
// remove from the appId cache
- cachedAppIds.remove(serviceName);
+ cachedAppInfo.remove(serviceName);
if (fileSystem.exists(appDir)) {
if (fileSystem.delete(appDir, true)) {
LOG.info("Successfully deleted service dir for " + serviceName + ": "
@@ -552,7 +562,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
// copy jars to hdfs and add to localResources
addJarResource(serviceName, localResources);
// add keytab if in secure env
- addKeytabResourceIfSecure(fs, localResources, conf, serviceName);
+ addKeytabResourceIfSecure(fs, localResources, app);
if (LOG.isDebugEnabled()) {
printLocalResources(localResources);
}
@@ -581,6 +591,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
amLaunchContext.setCommands(Collections.singletonList(cmdStr));
amLaunchContext.setEnvironment(env);
amLaunchContext.setLocalResources(localResources);
+ addHdfsDelegationTokenIfSecure(amLaunchContext);
submissionContext.setAMContainerSpec(amLaunchContext);
yarnClient.submitApplication(submissionContext);
return submissionContext.getApplicationId();
@@ -771,38 +782,75 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return appJson;
}
+ private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
+ throws IOException {
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ return;
+ }
+ Credentials credentials = new Credentials();
+ String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
+ if (StringUtils.isEmpty(tokenRenewer)) {
+ throw new IOException(
+ "Can't get Master Kerberos principal for the RM to use as renewer");
+ }
+ // Get hdfs dt
+ final org.apache.hadoop.security.token.Token<?>[] tokens =
+ fs.getFileSystem().addDelegationTokens(tokenRenewer, credentials);
+ if (tokens != null && tokens.length != 0) {
+ for (Token<?> token : tokens) {
+ LOG.debug("Got DT: " + token);
+ }
+ DataOutputBuffer dob = new DataOutputBuffer();
+ credentials.writeTokenStorageToStream(dob);
+ ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+ amContext.setTokens(fsTokens);
+ }
+ }
+
private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
- Map<String, LocalResource> localResource, Configuration conf,
- String serviceName) throws IOException, BadConfigException {
+ Map<String, LocalResource> localResource, Service service)
+ throws IOException, YarnException {
if (!UserGroupInformation.isSecurityEnabled()) {
return;
}
- String keytabPreInstalledOnHost =
- conf.get(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
- if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
- String amKeytabName =
- conf.get(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
- String keytabDir = conf.get(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
- Path keytabPath =
- fileSystem.buildKeytabPath(keytabDir, amKeytabName, serviceName);
- if (fileSystem.getFileSystem().exists(keytabPath)) {
- LocalResource keytabRes =
- fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
- localResource
- .put(YarnServiceConstants.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
- LOG.info("Adding AM keytab on hdfs: " + keytabPath);
- } else {
- LOG.warn("No keytab file was found at {}.", keytabPath);
- if (conf.getBoolean(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
- throw new BadConfigException("No keytab file was found at %s.",
- keytabPath);
- } else {
- LOG.warn("The AM will be "
- + "started without a kerberos authenticated identity. "
- + "The service is therefore not guaranteed to remain "
- + "operational beyond 24 hours.");
- }
+ String principalName = service.getKerberosPrincipal().getPrincipalName();
+ if (StringUtils.isEmpty(principalName)) {
+ LOG.warn("No Kerberos principal name specified for " + service.getName());
+ return;
+ }
+ if(StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
+ LOG.warn("No Kerberos keytab specified for " + service.getName());
+ return;
+ }
+
+ URI keytabURI;
+ try {
+ keytabURI = new URI(service.getKerberosPrincipal().getKeytab());
+ } catch (URISyntaxException e) {
+ throw new YarnException(e);
+ }
+
+ switch (keytabURI.getScheme()) {
+ case "hdfs":
+ Path keytabOnhdfs = new Path(keytabURI);
+ if (!fileSystem.getFileSystem().exists(keytabOnhdfs)) {
+ LOG.warn(service.getName() + "'s keytab (principalName = " +
+ principalName + ") doesn't exist at: " + keytabOnhdfs);
+ return;
}
+ LocalResource keytabRes =
+ fileSystem.createAmResource(keytabOnhdfs, LocalResourceType.FILE);
+ localResource.put(String.format(YarnServiceConstants.KEYTAB_LOCATION,
+ service.getName()), keytabRes);
+ LOG.debug("Adding " + service.getName() + "'s keytab for " +
+ "localization, uri = " + keytabOnhdfs);
+ break;
+ case "file":
+ LOG.debug("Using a keytab from localhost: " + keytabURI);
+ break;
+ default:
+ LOG.warn("Unsupported URI scheme " + keytabURI);
+ break;
}
}
@@ -856,7 +904,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return "";
}
ClientAMProtocol amProxy =
- createAMProxy(appReport.getHost(), appReport.getRpcPort());
+ createAMProxy(appReport.getName(), appReport);
GetStatusResponseProto response =
amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
return response.getStatus();
@@ -886,7 +934,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
return appSpec;
}
ClientAMProtocol amProxy =
- createAMProxy(appReport.getHost(), appReport.getRpcPort());
+ createAMProxy(serviceName, appReport);
GetStatusResponseProto response =
amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
appSpec = jsonSerDeser.fromJson(response.getStatus());
@@ -935,18 +983,37 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
}
- protected ClientAMProtocol createAMProxy(String host, int port)
- throws IOException {
+ protected ClientAMProtocol createAMProxy(String serviceName,
+ ApplicationReport appReport) throws IOException, YarnException {
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (!cachedAppInfo.containsKey(serviceName)) {
+ Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
+ cachedAppInfo.put(serviceName, new AppInfo(appReport.getApplicationId(),
+ persistedService.getKerberosPrincipal().getPrincipalName()));
+ }
+ String principalName = cachedAppInfo.get(serviceName).principalName;
+ // Inject the principal into hadoop conf, because Hadoop
+ // SaslRpcClient#getServerPrincipal requires a config for the
+ // principal
+ if (!StringUtils.isEmpty(principalName)) {
+ getConfig().set(PRINCIPAL, principalName);
+ } else {
+ throw new YarnException("No principal specified in the persisted " +
+ "service definition, fail to connect to AM.");
+ }
+ }
InetSocketAddress address =
- NetUtils.createSocketAddrForHost(host, port);
+ NetUtils.createSocketAddrForHost(appReport.getHost(), appReport
+ .getRpcPort());
return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
UserGroupInformation.getCurrentUser(), rpc, address);
}
public synchronized ApplicationId getAppId(String serviceName)
throws IOException, YarnException {
- if (cachedAppIds.containsKey(serviceName)) {
- return cachedAppIds.get(serviceName);
+ if (cachedAppInfo.containsKey(serviceName)) {
+ return cachedAppInfo.get(serviceName).appId;
}
Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
if (persistedService == null) {
@@ -954,7 +1021,18 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
+ " doesn't exist on hdfs. Please check if the app exists in RM");
}
ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
- cachedAppIds.put(serviceName, currentAppId);
+ cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
+ .getKerberosPrincipal().getPrincipalName()));
return currentAppId;
}
+
+ private static class AppInfo {
+ ApplicationId appId;
+ String principalName;
+
+ AppInfo(ApplicationId appId, String principalName) {
+ this.appId = appId;
+ this.principalName = principalName;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 88f4763..4e05e5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -101,7 +101,7 @@ public class Component implements EventHandler<ComponentEvent> {
new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
INIT)
// INIT will only got to FLEXING
- .addTransition(INIT, EnumSet.of(STABLE, FLEXING),
+ .addTransition(INIT, EnumSet.of(STABLE, FLEXING, INIT),
FLEX, new FlexComponentTransition())
// container recovered on AM restart
.addTransition(INIT, INIT, CONTAINER_RECOVERED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
index 684d980..ea8904a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -53,13 +53,6 @@ public class YarnServiceConf {
*/
public static final String YARN_SERVICE_BASE_PATH = "yarn.service.base.path";
- //TODO rename
- /** Declare that a keytab must be provided */
- public static final String KEY_AM_LOGIN_KEYTAB_REQUIRED = "slider.am.login.keytab.required";
- public static final String KEY_AM_LOGIN_KEYTAB_NAME = "slider.am.login.keytab.name";
- public static final String KEY_HDFS_KEYTAB_DIR = "slider.hdfs.keytab.dir";
- public static final String KEY_AM_KEYTAB_LOCAL_PATH = "slider.am.keytab.local.path";
-
/**
* maximum number of failed containers (in a single component)
* before the app exits
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
index 3973759..0378d24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
@@ -40,6 +40,8 @@ public interface YarnServiceConstants {
String APP_TYPE = "yarn-service";
String KEYTAB_DIR = "keytabs";
+ String KEYTAB_LOCATION = KEYTAB_DIR + "/%s" + ".keytab";
+
String RESOURCE_DIR = "resources";
@@ -89,4 +91,5 @@ public interface YarnServiceConstants {
String ERR_FILE = "stderr.txt";
String CONTENT = "content";
+ String PRINCIPAL = "yarn.service.am.principal";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
index 2d7c3bb..e1e88cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
@@ -19,16 +19,15 @@
package org.apache.hadoop.yarn.service.containerlaunch;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
-import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,10 +49,6 @@ public class AbstractLauncher {
LoggerFactory.getLogger(AbstractLauncher.class);
public static final String CLASSPATH = "CLASSPATH";
/**
- * Filesystem to use for the launch
- */
- protected final CoreFileSystem coreFileSystem;
- /**
* Env vars; set up at final launch stage
*/
protected final Map<String, String> envVars = new HashMap<>();
@@ -63,25 +58,15 @@ public class AbstractLauncher {
protected final Map<String, LocalResource> localResources = new HashMap<>();
protected final Map<String, String> mountPaths = new HashMap<>();
private final Map<String, ByteBuffer> serviceData = new HashMap<>();
- // security
- protected final Credentials credentials;
protected boolean yarnDockerMode = false;
protected String dockerImage;
protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
protected String dockerHostname;
protected String runPrivilegedContainer;
+ private ServiceContext context;
-
- /**
- * Create instance.
- * @param coreFileSystem filesystem
- * @param credentials initial set of credentials -null is permitted
- */
- public AbstractLauncher(
- CoreFileSystem coreFileSystem,
- Credentials credentials) {
- this.coreFileSystem = coreFileSystem;
- this.credentials = credentials != null ? credentials: new Credentials();
+ public AbstractLauncher(ServiceContext context) {
+ this.context = context;
}
public void setYarnDockerMode(boolean yarnDockerMode){
@@ -113,14 +98,6 @@ public class AbstractLauncher {
mountPaths.put(subPath, mountPath);
}
- /**
- * Accessor to the credentials
- * @return the credentials associated with this launcher
- */
- public Credentials getCredentials() {
- return credentials;
- }
-
public void addCommand(String cmd) {
commands.add(cmd);
@@ -160,9 +137,9 @@ public class AbstractLauncher {
containerLaunchContext.setLocalResources(localResources);
//tokens
- log.debug("{} tokens", credentials.numberOfTokens());
- containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
- credentials));
+ if (context.tokens != null) {
+ containerLaunchContext.setTokens(context.tokens.duplicate());
+ }
if(yarnDockerMode){
Map<String, String> env = containerLaunchContext.getEnvironment();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index b9f3a24..e07661b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.containerlaunch;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.provider.ProviderService;
@@ -40,10 +41,11 @@ public class ContainerLaunchService extends AbstractService{
private ExecutorService executorService;
private SliderFileSystem fs;
-
- public ContainerLaunchService(SliderFileSystem fs) {
+ private ServiceContext context;
+ public ContainerLaunchService(ServiceContext context) {
super(ContainerLaunchService.class.getName());
- this.fs = fs;
+ this.fs = context.fs;
+ this.context = context;
}
@Override
@@ -84,7 +86,7 @@ public class ContainerLaunchService extends AbstractService{
Component compSpec = instance.getCompSpec();
ProviderService provider = ProviderFactory.getProviderService(
compSpec.getArtifact());
- AbstractLauncher launcher = new AbstractLauncher(fs, null);
+ AbstractLauncher launcher = new AbstractLauncher(context);
try {
provider.buildContainerLaunchContext(launcher, service,
instance, fs, getConfig(), container);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
deleted file mode 100644
index fce58e5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.service.containerlaunch;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
-
-/**
- * Utils to work with credentials and tokens.
- *
- * Designed to be movable to Hadoop core
- */
-public final class CredentialUtils {
-
- private CredentialUtils() {
- }
-
- private static final Logger LOG =
- LoggerFactory.getLogger(CredentialUtils.class);
-
- /**
- * Save credentials to a byte buffer. Returns null if there were no
- * credentials to save
- * @param credentials credential set
- * @return a byte buffer of serialized tokens
- * @throws IOException if the credentials could not be written to the stream
- */
- public static ByteBuffer marshallCredentials(Credentials credentials) throws IOException {
- ByteBuffer buffer = null;
- if (!credentials.getAllTokens().isEmpty()) {
- DataOutputBuffer dob = new DataOutputBuffer();
- try {
- credentials.writeTokenStorageToStream(dob);
- } finally {
- dob.close();
- }
- buffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
- }
- return buffer;
- }
-
- /**
- * Save credentials to a file
- * @param file file to save to (will be overwritten)
- * @param credentials credentials to write
- * @throws IOException
- */
- public static void saveTokens(File file,
- Credentials credentials) throws IOException {
- try(DataOutputStream daos = new DataOutputStream(
- new FileOutputStream(file))) {
- credentials.writeTokenStorageToStream(daos);
- }
- }
-
- /**
- * Look up and return the resource manager's principal. This method
- * automatically does the <code>_HOST</code> replacement in the principal and
- * correctly handles HA resource manager configurations.
- *
- * From: YARN-4629
- * @param conf the {@link Configuration} file from which to read the
- * principal
- * @return the resource manager's principal string
- * @throws IOException thrown if there's an error replacing the host name
- */
- public static String getRMPrincipal(Configuration conf) throws IOException {
- String principal = conf.get(RM_PRINCIPAL, "");
- String hostname;
- Preconditions.checkState(!principal.isEmpty(), "Not set: " + RM_PRINCIPAL);
-
- if (HAUtil.isHAEnabled(conf)) {
- YarnConfiguration yarnConf = new YarnConfiguration(conf);
- if (yarnConf.get(RM_HA_ID) == null) {
- // If RM_HA_ID is not configured, use the first of RM_HA_IDS.
- // Any valid RM HA ID should work.
- String[] rmIds = yarnConf.getStrings(RM_HA_IDS);
- Preconditions.checkState((rmIds != null) && (rmIds.length > 0),
- "Not set " + RM_HA_IDS);
- yarnConf.set(RM_HA_ID, rmIds[0]);
- }
-
- hostname = yarnConf.getSocketAddr(
- RM_ADDRESS,
- DEFAULT_RM_ADDRESS,
- DEFAULT_RM_PORT).getHostName();
- } else {
- hostname = conf.getSocketAddr(
- RM_ADDRESS,
- DEFAULT_RM_ADDRESS,
- DEFAULT_RM_PORT).getHostName();
- }
- return SecurityUtil.getServerPrincipal(principal, hostname);
- }
-
- /**
- * Create and add any filesystem delegation tokens with
- * the RM(s) configured to be able to renew them. Returns null
- * on an insecure cluster (i.e. harmless)
- * @param conf configuration
- * @param fs filesystem
- * @param credentials credentials to update
- * @return a list of all added tokens.
- * @throws IOException
- */
- public static Token<?>[] addRMRenewableFSDelegationTokens(Configuration conf,
- FileSystem fs,
- Credentials credentials) throws IOException {
- Preconditions.checkArgument(conf != null);
- Preconditions.checkArgument(credentials != null);
- if (UserGroupInformation.isSecurityEnabled()) {
- return fs.addDelegationTokens(CredentialUtils.getRMPrincipal(conf),
- credentials);
- }
- return null;
- }
-
- /**
- * Add an FS delegation token which can be renewed by the current user
- * @param fs filesystem
- * @param credentials credentials to update
- * @throws IOException problems.
- */
- public static void addSelfRenewableFSDelegationTokens(
- FileSystem fs,
- Credentials credentials) throws IOException {
- Preconditions.checkArgument(fs != null);
- Preconditions.checkArgument(credentials != null);
- fs.addDelegationTokens(
- getSelfRenewer(),
- credentials);
- }
-
- public static String getSelfRenewer() throws IOException {
- return UserGroupInformation.getLoginUser().getShortUserName();
- }
-
- /**
- * Create and add an RM delegation token to the credentials
- * @param yarnClient Yarn Client
- * @param credentials to add token to
- * @return the token which was added
- * @throws IOException
- * @throws YarnException
- */
- public static Token<TokenIdentifier> addRMDelegationToken(YarnClient yarnClient,
- Credentials credentials)
- throws IOException, YarnException {
- Configuration conf = yarnClient.getConfig();
- Text rmPrincipal = new Text(CredentialUtils.getRMPrincipal(conf));
- Text rmDTService = ClientRMProxy.getRMDelegationTokenService(conf);
- Token<TokenIdentifier> rmDelegationToken =
- ConverterUtils.convertFromYarn(
- yarnClient.getRMDelegationToken(rmPrincipal),
- rmDTService);
- credentials.addToken(rmDelegationToken.getService(), rmDelegationToken);
- return rmDelegationToken;
- }
-
- public static Token<TimelineDelegationTokenIdentifier> maybeAddTimelineToken(
- Configuration conf,
- Credentials credentials)
- throws IOException, YarnException {
- if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
- LOG.debug("Timeline service enabled -fetching token");
-
- try(TimelineClient timelineClient = TimelineClient.createTimelineClient()) {
- timelineClient.init(conf);
- timelineClient.start();
- Token<TimelineDelegationTokenIdentifier> token =
- timelineClient.getDelegationToken(
- CredentialUtils.getRMPrincipal(conf));
- credentials.addToken(token.getService(), token);
- return token;
- }
- } else {
- LOG.debug("Timeline service is disabled");
- return null;
- }
- }
-
- /**
- * Filter a list of tokens from a set of credentials
- * @param credentials credential source (a new credential set os re
- * @param filter List of tokens to strip out
- * @return a new, filtered, set of credentials
- */
- public static Credentials filterTokens(Credentials credentials,
- List<Text> filter) {
- Credentials result = new Credentials(credentials);
- Iterator<Token<? extends TokenIdentifier>> iter =
- result.getAllTokens().iterator();
- while (iter.hasNext()) {
- Token<? extends TokenIdentifier> token = iter.next();
- LOG.debug("Token {}", token.getKind());
- if (filter.contains(token.getKind())) {
- LOG.debug("Filtering token {}", token.getKind());
- iter.remove();
- }
- }
- return result;
- }
-
- public static String dumpTokens(Credentials credentials, String separator) {
- ArrayList<Token<? extends TokenIdentifier>> sorted =
- new ArrayList<>(credentials.getAllTokens());
- Collections.sort(sorted, new TokenComparator());
- StringBuilder buffer = new StringBuilder(sorted.size()* 128);
- for (Token<? extends TokenIdentifier> token : sorted) {
- buffer.append(tokenToString(token)).append(separator);
- }
- return buffer.toString();
- }
-
- /**
- * Create a string for people to look at
- * @param token token to convert to a string form
- * @return a printable view of the token
- */
- public static String tokenToString(Token<? extends TokenIdentifier> token) {
- DateFormat df = DateFormat.getDateTimeInstance(
- DateFormat.SHORT, DateFormat.SHORT);
- StringBuilder buffer = new StringBuilder(128);
- buffer.append(token.toString());
- try {
- TokenIdentifier ti = token.decodeIdentifier();
- buffer.append("; ").append(ti);
- if (ti instanceof AbstractDelegationTokenIdentifier) {
- // details in human readable form, and compensate for information HDFS DT omits
- AbstractDelegationTokenIdentifier dt = (AbstractDelegationTokenIdentifier) ti;
- buffer.append("; Renewer: ").append(dt.getRenewer());
- buffer.append("; Issued: ")
- .append(df.format(new Date(dt.getIssueDate())));
- buffer.append("; Max Date: ")
- .append(df.format(new Date(dt.getMaxDate())));
- }
- } catch (IOException e) {
- //marshall problem; not ours
- LOG.debug("Failed to decode {}: {}", token, e, e);
- }
- return buffer.toString();
- }
-
- /**
- * Get the expiry time of a token.
- * @param token token to examine
- * @return the time in milliseconds after which the token is invalid.
- * @throws IOException
- */
- public static long getTokenExpiryTime(Token token) throws IOException {
- TokenIdentifier identifier = token.decodeIdentifier();
- Preconditions.checkState(identifier instanceof AbstractDelegationTokenIdentifier,
- "Token %s of type: %s has an identifier which cannot be examined: %s",
- token, token.getClass(), identifier);
- AbstractDelegationTokenIdentifier id =
- (AbstractDelegationTokenIdentifier) identifier;
- return id.getMaxDate();
- }
-
- private static class TokenComparator
- implements Comparator<Token<? extends TokenIdentifier>>, Serializable {
- @Override
- public int compare(Token<? extends TokenIdentifier> left,
- Token<? extends TokenIdentifier> right) {
- return left.getKind().toString().compareTo(right.getKind().toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
new file mode 100644
index 0000000..766da0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Yarn Service framework.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.service;
+import org.apache.hadoop.classification.InterfaceAudience;
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index c0c44c3..d65a196 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
@@ -28,21 +27,18 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
-import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
-import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
import org.apache.hadoop.yarn.service.utils.PublishedConfigurationOutputter;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,53 +159,6 @@ public class ProviderUtils implements YarnServiceConstants {
}
}
- /**
- * Localize the service keytabs for the service.
- * @param launcher container launcher
- * @param fileSystem file system
- * @throws IOException trouble uploading to HDFS
- */
- public void localizeServiceKeytabs(AbstractLauncher launcher,
- SliderFileSystem fileSystem, Service service) throws IOException {
-
- Configuration conf = service.getConfiguration();
- String keytabPathOnHost =
- conf.getProperty(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
- if (ServiceUtils.isUnset(keytabPathOnHost)) {
- String amKeytabName =
- conf.getProperty(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
- String keytabDir =
- conf.getProperty(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
- // we need to localize the keytab files in the directory
- Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
- service.getName());
- boolean serviceKeytabsDeployed = false;
- if (fileSystem.getFileSystem().exists(keytabDirPath)) {
- FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(
- keytabDirPath);
- LocalResource keytabRes;
- for (FileStatus keytab : keytabs) {
- if (!amKeytabName.equals(keytab.getPath().getName())
- && keytab.getPath().getName().endsWith(".keytab")) {
- serviceKeytabsDeployed = true;
- log.info("Localizing keytab {}", keytab.getPath().getName());
- keytabRes = fileSystem.createAmResource(keytab.getPath(),
- LocalResourceType.FILE);
- launcher.addLocalResource(KEYTAB_DIR + "/" +
- keytab.getPath().getName(),
- keytabRes);
- }
- }
- }
- if (!serviceKeytabsDeployed) {
- log.warn("No service keytabs for the service have been localized. "
- + "If the service requires keytabs for secure operation, "
- + "please ensure that the required keytabs have been uploaded "
- + "to the folder {}", keytabDirPath);
- }
- }
- }
-
public static Path initCompInstanceDir(SliderFileSystem fs,
ComponentInstance instance) {
Path compDir = new Path(new Path(fs.getAppDir(), "components"),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index b58cea8..d5ea45c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Artifact;
@@ -40,6 +42,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -86,6 +90,17 @@ public class ServiceApiUtil {
"No component specified for " + service.getName());
}
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (!StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
+ try {
+ // validate URI format
+ new URI(service.getKerberosPrincipal().getKeytab());
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+
// Validate there are no component name collisions (collisions are not
// currently supported) and add any components from external services
Configuration globalConf = service.getConfiguration();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org