You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2017/12/04 20:18:56 UTC

[1/2] hadoop git commit: YARN-6669. Implemented Kerberos security for YARN service framework. (Contributed by Jian He)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 404eab4dc -> d30d57828


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
index e18bcae..173001b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceUtils.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -43,9 +45,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.URL;
-import java.net.URLDecoder;
+import java.net.*;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -57,6 +57,11 @@ import java.util.Map;
 import java.util.regex.Pattern;
 import java.util.zip.GZIPOutputStream;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
+    .HADOOP_SECURITY_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
+    .HADOOP_SECURITY_DNS_NAMESERVER_KEY;
+
 /**
  * These are slider-specific Util methods
  */
@@ -542,4 +547,24 @@ public final class ServiceUtils {
   public static String createDescriptionTag(String description) {
     return "Description: " + description;
   }
+
+  // Copied from SecurityUtil because it is not public
+  public static String getLocalHostName(@Nullable Configuration conf)
+      throws UnknownHostException {
+    if (conf != null) {
+      String dnsInterface = conf.get(HADOOP_SECURITY_DNS_INTERFACE_KEY);
+      String nameServer = conf.get(HADOOP_SECURITY_DNS_NAMESERVER_KEY);
+
+      if (dnsInterface != null) {
+        return DNS.getDefaultHost(dnsInterface, nameServer, true);
+      } else if (nameServer != null) {
+        throw new IllegalArgumentException(HADOOP_SECURITY_DNS_NAMESERVER_KEY +
+            " requires " + HADOOP_SECURITY_DNS_INTERFACE_KEY + ". Check your" +
+            "configuration.");
+      }
+    }
+
+    // Fallback to querying the default hostname as we did before.
+    return InetAddress.getLocalHost().getCanonicalHostName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
new file mode 100644
index 0000000..14cdf68
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
@@ -0,0 +1,14 @@
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+org.apache.hadoop.yarn.service.ClientAMSecurityInfo

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryOperationsFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryOperationsFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryOperationsFactory.java
index 704b097..e74ca81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryOperationsFactory.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/api/RegistryOperationsFactory.java
@@ -111,6 +111,27 @@ public final class RegistryOperationsFactory {
   }
 
   /**
+   * Create a kerberos registry service client
+   * @param conf configuration
+   * @param jaasClientEntry the name of the login config entry
+   * @param principal principal of the client.
+   * @param keytab location to the keytab file
+   * @return a registry service client instance
+   */
+  public static RegistryOperations createKerberosInstance(Configuration conf,
+      String jaasClientEntry, String principal, String keytab) {
+    Preconditions.checkArgument(conf != null, "Null configuration");
+    conf.set(KEY_REGISTRY_CLIENT_AUTH, REGISTRY_CLIENT_AUTH_KERBEROS);
+    conf.set(KEY_REGISTRY_CLIENT_JAAS_CONTEXT, jaasClientEntry);
+    RegistryOperationsClient operations =
+        new RegistryOperationsClient("KerberosRegistryOperations");
+    operations.setKerberosPrincipalAndKeytab(principal, keytab);
+    operations.init(conf);
+    return operations;
+  }
+
+
+  /**
    * Create and initialize an operations instance authenticated with write
    * access via an <code>id:password</code> pair.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
index 8713920..c81a0ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
@@ -127,6 +127,7 @@ public class CuratorService extends CompositeService
     } else {
       this.bindingSource = this;
     }
+    registrySecurity = new RegistrySecurity("registry security");
   }
 
   /**
@@ -152,8 +153,7 @@ public class CuratorService extends CompositeService
     registryRoot = conf.getTrimmed(KEY_REGISTRY_ZK_ROOT,
         DEFAULT_ZK_REGISTRY_ROOT);
 
-    // create and add the registy service
-    registrySecurity = new RegistrySecurity("registry security");
+    // add the registy service
     addService(registrySecurity);
 
     if (LOG.isDebugEnabled()) {
@@ -163,6 +163,10 @@ public class CuratorService extends CompositeService
     super.serviceInit(conf);
   }
 
+  public void setKerberosPrincipalAndKeytab(String principal, String keytab) {
+    registrySecurity.setKerberosPrincipalAndKeytab(principal, keytab);
+  }
+
   /**
    * Start the service.
    * This is where the curator instance is started.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
index 23fadb5..ff6e8aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
@@ -23,6 +23,7 @@ import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.StringUtils;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.util.KerberosUtil;
@@ -31,6 +32,7 @@ import org.apache.hadoop.service.ServiceStateException;
 import org.apache.hadoop.util.ZKUtil;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
@@ -44,9 +46,11 @@ import java.lang.reflect.InvocationTargetException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import static org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions.*;
@@ -170,13 +174,17 @@ public class RegistrySecurity extends AbstractService {
   /**
    * Client context
    */
-  private String jaasClientContext;
+  private String jaasClientEntry;
 
   /**
    * Client identity
    */
   private String jaasClientIdentity;
 
+  private String principal;
+
+  private String keytab;
+
   /**
    * Create an instance
    * @param name service name
@@ -238,6 +246,8 @@ public class RegistrySecurity extends AbstractService {
 
       systemACLs.addAll(buildACLs(system, kerberosRealm, ZooDefs.Perms.ALL));
 
+      LOG.info("Registry default system acls: " + System.lineSeparator() +
+          systemACLs);
       // user accounts (may be empty, but for digest one user AC must
       // be built up
       String user = getConfig().get(KEY_REGISTRY_USER_ACCOUNTS,
@@ -252,6 +262,7 @@ public class RegistrySecurity extends AbstractService {
           userACLs.add(self);
         }
       }
+      LOG.info("Registry User ACLs " + System.lineSeparator()+ userACLs);
 
       // here check for UGI having secure on or digest + ID
       switch (access) {
@@ -262,13 +273,12 @@ public class RegistrySecurity extends AbstractService {
           }
           UserGroupInformation currentUser =
               UserGroupInformation.getCurrentUser();
-          jaasClientContext = getOrFail(KEY_REGISTRY_CLIENT_JAAS_CONTEXT,
+          jaasClientEntry = getOrFail(KEY_REGISTRY_CLIENT_JAAS_CONTEXT,
               DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT);
           jaasClientIdentity = currentUser.getShortUserName();
           if (LOG.isDebugEnabled()) {
             LOG.debug("Auth is SASL user=\"{}\" JAAS context=\"{}\"",
-                jaasClientIdentity,
-                jaasClientContext);
+                jaasClientIdentity, jaasClientEntry);
           }
           break;
 
@@ -738,9 +748,81 @@ public class RegistrySecurity extends AbstractService {
           break;
 
         case sasl:
-          // bind to the current identity and context within the JAAS file
-          setZKSaslClientProperties(jaasClientIdentity, jaasClientContext);
+          JaasConfiguration jconf =
+              new JaasConfiguration(jaasClientEntry, principal, keytab);
+          javax.security.auth.login.Configuration.setConfiguration(jconf);
+          setSystemPropertyIfUnset(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY,
+              "true");
+          setSystemPropertyIfUnset(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+              jaasClientEntry);
+          LOG.info(
+              "Enabling ZK sasl client: jaasClientEntry = " + jaasClientEntry
+                  + ", principal = " + principal + ", keytab = " + keytab);
+      }
+    }
+  }
+
+  public void setKerberosPrincipalAndKeytab(String principal, String keytab) {
+    this.principal = principal;
+    this.keytab = keytab;
+  }
+
+  /**
+   * Creates a programmatic version of a jaas.conf file. This can be used
+   * instead of writing a jaas.conf file and setting the system property,
+   * "java.security.auth.login.config", to point to that file. It is meant to be
+   * used for connecting to ZooKeeper.
+   */
+  @InterfaceAudience.Private
+  public static class JaasConfiguration extends
+      javax.security.auth.login.Configuration {
+
+    private final javax.security.auth.login.Configuration baseConfig =
+        javax.security.auth.login.Configuration.getConfiguration();
+    private static AppConfigurationEntry[] entry;
+    private String entryName;
+
+    /**
+     * Add an entry to the jaas configuration with the passed in name,
+     * principal, and keytab. The other necessary options will be set for you.
+     *
+     * @param entryName The name of the entry (e.g. "Client")
+     * @param principal The principal of the user
+     * @param keytab The location of the keytab
+     */
+    public JaasConfiguration(String entryName, String principal, String keytab) {
+      this.entryName = entryName;
+      Map<String, String> options = new HashMap<String, String>();
+      options.put("keyTab", keytab);
+      options.put("principal", principal);
+      options.put("useKeyTab", "true");
+      options.put("storeKey", "true");
+      options.put("useTicketCache", "false");
+      options.put("refreshKrb5Config", "true");
+      String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG");
+      if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
+        options.put("debug", "true");
+      }
+      entry = new AppConfigurationEntry[]{
+          new AppConfigurationEntry(getKrb5LoginModuleName(),
+              AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+              options)};
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+      return (entryName.equals(name)) ? entry : ((baseConfig != null)
+          ? baseConfig.getAppConfigurationEntry(name) : null);
+    }
+
+    private String getKrb5LoginModuleName() {
+      String krb5LoginModuleName;
+      if (System.getProperty("java.vendor").contains("IBM")) {
+        krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
+      } else {
+        krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
       }
+      return krb5LoginModuleName;
     }
   }
 
@@ -899,7 +981,7 @@ public class RegistrySecurity extends AbstractService {
              .append("; ");
       builder.append(KEY_REGISTRY_CLIENT_JAAS_CONTEXT)
              .append("=")
-             .append(jaasClientContext)
+             .append(jaasClientEntry)
              .append("; ");
       builder.append(describeProperty(PROP_ZK_SASL_CLIENT_USERNAME));
       builder.append(describeProperty(PROP_ZK_SASL_CLIENT_CONTEXT));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
index d7ebece..358a963 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNS.java
@@ -412,6 +412,10 @@ public class RegistryDNS extends AbstractService implements DNSOperations,
       // Single reverse zone
     } else {
       Name reverseLookupZoneName = getReverseZoneName(conf);
+      if (reverseLookupZoneName == null) {
+        // reverse lookup disabled
+        return;
+      }
       Zone reverseLookupZone = configureZone(reverseLookupZoneName, conf);
       zones.put(reverseLookupZone.getOrigin(), reverseLookupZone);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
deleted file mode 100644
index e11890f..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.registry.server.integration;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
-import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.registry.server.services.DeleteCompletionCallback;
-import org.apache.hadoop.registry.server.services.RegistryAdminService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.Future;
-
-/**
- * Handle RM events by updating the registry
- * <p>
- * These actions are all implemented as event handlers to operations
- * which come from the RM.
- * <p>
- * This service is expected to be executed by a user with the permissions
- * to manipulate the entire registry,
- */
-@InterfaceAudience.LimitedPrivate("YARN")
-@InterfaceStability.Evolving
-public class RMRegistryOperationsService extends RegistryAdminService {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RMRegistryOperationsService.class);
-
-  private PurgePolicy purgeOnCompletionPolicy = PurgePolicy.PurgeAll;
-
-  public RMRegistryOperationsService(String name) {
-    this(name, null);
-  }
-
-  public RMRegistryOperationsService(String name,
-      RegistryBindingSource bindingSource) {
-    super(name, bindingSource);
-  }
-
-
-  /**
-   * Extend the parent service initialization by verifying that the
-   * service knows —in a secure cluster— the realm in which it is executing.
-   * It needs this to properly build up the user names and hence their
-   * access rights.
-   *
-   * @param conf configuration of the service
-   * @throws Exception
-   */
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
-
-    verifyRealmValidity();
-  }
-
-  public PurgePolicy getPurgeOnCompletionPolicy() {
-    return purgeOnCompletionPolicy;
-  }
-
-  public void setPurgeOnCompletionPolicy(PurgePolicy purgeOnCompletionPolicy) {
-    this.purgeOnCompletionPolicy = purgeOnCompletionPolicy;
-  }
-
-  public void onApplicationAttemptRegistered(ApplicationAttemptId attemptId,
-      String host, int rpcport, String trackingurl) throws IOException {
-
-  }
-
-  public void onApplicationLaunched(ApplicationId id) throws IOException {
-
-  }
-
-  /**
-   * Actions to take as an AM registers itself with the RM.
-   * @param attemptId attempt ID
-   * @throws IOException problems
-   */
-  public void onApplicationMasterRegistered(ApplicationAttemptId attemptId) throws
-      IOException {
-  }
-
-  /**
-   * Actions to take when the AM container is completed
-   * @param containerId  container ID
-   * @throws IOException problems
-   */
-  public void onAMContainerFinished(ContainerId containerId) throws
-      IOException {
-    LOG.info("AM Container {} finished, purging application attempt records",
-        containerId);
-
-    // remove all application attempt entries
-    purgeAppAttemptRecords(containerId.getApplicationAttemptId());
-
-    // also treat as a container finish to remove container
-    // level records for the AM container
-    onContainerFinished(containerId);
-  }
-
-  /**
-   * remove all application attempt entries
-   * @param attemptId attempt ID
-   */
-  protected void purgeAppAttemptRecords(ApplicationAttemptId attemptId) {
-    purgeRecordsAsync("/",
-        attemptId.toString(),
-        PersistencePolicies.APPLICATION_ATTEMPT);
-  }
-
-  /**
-   * Actions to take when an application attempt is completed
-   * @param attemptId  application  ID
-   * @throws IOException problems
-   */
-  public void onApplicationAttemptUnregistered(ApplicationAttemptId attemptId)
-      throws IOException {
-    LOG.info("Application attempt {} unregistered, purging app attempt records",
-        attemptId);
-    purgeAppAttemptRecords(attemptId);
-  }
-
-  /**
-   * Actions to take when an application is completed
-   * @param id  application  ID
-   * @throws IOException problems
-   */
-  public void onApplicationCompleted(ApplicationId id)
-      throws IOException {
-    LOG.info("Application {} completed, purging application-level records",
-        id);
-    purgeRecordsAsync("/",
-        id.toString(),
-        PersistencePolicies.APPLICATION);
-  }
-
-  public void onApplicationAttemptAdded(ApplicationAttemptId appAttemptId) {
-  }
-
-  /**
-   * This is the event where the user is known, so the user directory
-   * can be created
-   * @param applicationId application  ID
-   * @param user username
-   * @throws IOException problems
-   */
-  public void onStateStoreEvent(ApplicationId applicationId, String user) throws
-      IOException {
-    initUserRegistryAsync(user);
-  }
-
-  /**
-   * Actions to take when the AM container is completed
-   * @param id  container ID
-   * @throws IOException problems
-   */
-  public void onContainerFinished(ContainerId id) throws IOException {
-    LOG.info("Container {} finished, purging container-level records",
-        id);
-    purgeRecordsAsync("/",
-        id.toString(),
-        PersistencePolicies.CONTAINER);
-  }
-
-  /**
-   * Queue an async operation to purge all matching records under a base path.
-   * <ol>
-   *   <li>Uses a depth first search</li>
-   *   <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
-   *   <li>If a record matches then it is deleted without any child searches</li>
-   *   <li>Deletions will be asynchronous if a callback is provided</li>
-   * </ol>
-   * @param path base path
-   * @param id ID for service record.id
-   * @param persistencePolicyMatch ID for the persistence policy to match:
-   * no match, no delete.
-   * @return a future that returns the #of records deleted
-   */
-  @VisibleForTesting
-  public Future<Integer> purgeRecordsAsync(String path,
-      String id,
-      String persistencePolicyMatch) {
-
-    return purgeRecordsAsync(path,
-        id, persistencePolicyMatch,
-        purgeOnCompletionPolicy,
-        new DeleteCompletionCallback());
-  }
-
-  /**
-   * Queue an async operation to purge all matching records under a base path.
-   * <ol>
-   *   <li>Uses a depth first search</li>
-   *   <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
-   *   <li>If a record matches then it is deleted without any child searches</li>
-   *   <li>Deletions will be asynchronous if a callback is provided</li>
-   * </ol>
-   * @param path base path
-   * @param id ID for service record.id
-   * @param persistencePolicyMatch ID for the persistence policy to match:
-   * no match, no delete.
-   * @param purgePolicy how to react to children under the entry
-   * @param callback an optional callback
-   * @return a future that returns the #of records deleted
-   */
-  @VisibleForTesting
-  public Future<Integer> purgeRecordsAsync(String path,
-      String id,
-      String persistencePolicyMatch,
-      PurgePolicy purgePolicy,
-      BackgroundCallback callback) {
-    LOG.info(" records under {} with ID {} and policy {}: {}",
-        path, id, persistencePolicyMatch);
-    return submit(
-        new AsyncPurge(path,
-            new SelectByYarnPersistence(id, persistencePolicyMatch),
-            purgePolicy,
-            callback));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
index e160d4a..829ef68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.registry.server.services;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 public class DeleteCompletionCallback implements BackgroundCallback {
   private static final Logger LOG =
-      LoggerFactory.getLogger(RMRegistryOperationsService.class);
+      LoggerFactory.getLogger(DeleteCompletionCallback.class);
 
   private AtomicInteger events = new AtomicInteger(0);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
index 5b34f60..0d4a467 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java
@@ -23,7 +23,8 @@ import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
 import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
+
+import org.apache.hadoop.registry.server.services.RegistryAdminService;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,22 +32,16 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URISyntaxException;
 
-/**
- * Abstract registry tests .. inits the field {@link #registry}
- * before the test with an instance of {@link RMRegistryOperationsService};
- * and {@link #operations} with the same instance cast purely
- * to the type {@link RegistryOperations}.
- *
- */
+
 public class AbstractRegistryTest extends AbstractZKRegistryTest {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractRegistryTest.class);
-  protected RMRegistryOperationsService registry;
+  protected RegistryAdminService registry;
   protected RegistryOperations operations;
 
   @Before
   public void setupRegistry() throws IOException {
-    registry = new RMRegistryOperationsService("yarnRegistry");
+    registry = new RegistryAdminService("yarnRegistry");
     operations = registry;
     registry.init(createRegistryConfiguration());
     registry.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/integration/TestRegistryRMOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/integration/TestRegistryRMOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/integration/TestRegistryRMOperations.java
deleted file mode 100644
index 451a69b..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/integration/TestRegistryRMOperations.java
+++ /dev/null
@@ -1,369 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.registry.integration;
-
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
-import org.apache.hadoop.registry.AbstractRegistryTest;
-import org.apache.hadoop.registry.client.api.BindFlags;
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
-import org.apache.hadoop.registry.client.impl.zk.ZKPathDumper;
-import org.apache.hadoop.registry.client.impl.CuratorEventCatcher;
-import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
-import org.apache.hadoop.registry.client.types.RegistryPathStatus;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
-import org.apache.hadoop.registry.server.services.DeleteCompletionCallback;
-import org.apache.hadoop.registry.server.services.RegistryAdminService;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.inetAddrEndpoint;
-import static org.apache.hadoop.registry.client.binding.RegistryTypeUtils.restEndpoint;
-
-public class TestRegistryRMOperations extends AbstractRegistryTest {
-  protected static final Logger LOG =
-      LoggerFactory.getLogger(TestRegistryRMOperations.class);
-
-  /**
-   * trigger a purge operation
-   * @param path path
-   * @param id yarn ID
-   * @param policyMatch policy to match ID on
-   * @param purgePolicy policy when there are children under a match
-   * @return the number purged
-   * @throws IOException
-   */
-  public int purge(String path,
-      String id,
-      String policyMatch,
-      RegistryAdminService.PurgePolicy purgePolicy) throws
-      IOException,
-      ExecutionException,
-      InterruptedException {
-    return purge(path, id, policyMatch, purgePolicy, null);
-  }
-
-  /**
-   *
-   * trigger a purge operation
-   * @param path pathn
-   * @param id yarn ID
-   * @param policyMatch policy to match ID on
-   * @param purgePolicy policy when there are children under a match
-   * @param callback optional callback
-   * @return the number purged
-   * @throws IOException
-   */
-  public int purge(String path,
-      String id,
-      String policyMatch,
-      RegistryAdminService.PurgePolicy purgePolicy,
-      BackgroundCallback callback) throws
-      IOException,
-      ExecutionException,
-      InterruptedException {
-
-    Future<Integer> future = registry.purgeRecordsAsync(path,
-        id, policyMatch, purgePolicy, callback);
-    try {
-      return future.get();
-    } catch (ExecutionException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      } else {
-        throw e;
-      }
-    }
-  }
-
-  @Test
-  public void testPurgeEntryCuratorCallback() throws Throwable {
-
-    String path = "/users/example/hbase/hbase1/";
-    ServiceRecord written = buildExampleServiceEntry(
-        PersistencePolicies.APPLICATION_ATTEMPT);
-    written.set(YarnRegistryAttributes.YARN_ID,
-        "testAsyncPurgeEntry_attempt_001");
-
-    operations.mknode(RegistryPathUtils.parentOf(path), true);
-    operations.bind(path, written, 0);
-
-    ZKPathDumper dump = registry.dumpPath(false);
-    CuratorEventCatcher events = new CuratorEventCatcher();
-
-    LOG.info("Initial state {}", dump);
-
-    // container query
-    String id = written.get(YarnRegistryAttributes.YARN_ID, "");
-    int opcount = purge("/",
-        id,
-        PersistencePolicies.CONTAINER,
-        RegistryAdminService.PurgePolicy.PurgeAll,
-        events);
-    assertPathExists(path);
-    assertEquals(0, opcount);
-    assertEquals("Event counter", 0, events.getCount());
-
-    // now the application attempt
-    opcount = purge("/",
-        id,
-        PersistencePolicies.APPLICATION_ATTEMPT,
-        RegistryAdminService.PurgePolicy.PurgeAll,
-        events);
-
-    LOG.info("Final state {}", dump);
-
-    assertPathNotFound(path);
-    assertEquals("wrong no of delete operations in " + dump, 1, opcount);
-    // and validate the callback event
-    assertEquals("Event counter", 1, events.getCount());
-  }
-
-  @Test
-  public void testAsyncPurgeEntry() throws Throwable {
-
-    String path = "/users/example/hbase/hbase1/";
-    ServiceRecord written = buildExampleServiceEntry(
-        PersistencePolicies.APPLICATION_ATTEMPT);
-    written.set(YarnRegistryAttributes.YARN_ID,
-        "testAsyncPurgeEntry_attempt_001");
-
-    operations.mknode(RegistryPathUtils.parentOf(path), true);
-    operations.bind(path, written, 0);
-
-    ZKPathDumper dump = registry.dumpPath(false);
-
-    LOG.info("Initial state {}", dump);
-
-    DeleteCompletionCallback deletions = new DeleteCompletionCallback();
-    int opcount = purge("/",
-        written.get(YarnRegistryAttributes.YARN_ID, ""),
-        PersistencePolicies.CONTAINER,
-        RegistryAdminService.PurgePolicy.PurgeAll,
-        deletions);
-    assertPathExists(path);
-
-    dump = registry.dumpPath(false);
-
-    assertEquals("wrong no of delete operations in " + dump, 0,
-        deletions.getEventCount());
-    assertEquals("wrong no of delete operations in " + dump, 0, opcount);
-
-
-    // now app attempt
-    deletions = new DeleteCompletionCallback();
-    opcount = purge("/",
-        written.get(YarnRegistryAttributes.YARN_ID, ""),
-        PersistencePolicies.APPLICATION_ATTEMPT,
-        RegistryAdminService.PurgePolicy.PurgeAll,
-        deletions);
-
-    dump = registry.dumpPath(false);
-    LOG.info("Final state {}", dump);
-
-    assertPathNotFound(path);
-    assertEquals("wrong no of delete operations in " + dump, 1,
-        deletions.getEventCount());
-    assertEquals("wrong no of delete operations in " + dump, 1, opcount);
-    // and validate the callback event
-
-  }
-
-  @Test
-  public void testPutGetContainerPersistenceServiceEntry() throws Throwable {
-
-    String path = ENTRY_PATH;
-    ServiceRecord written = buildExampleServiceEntry(
-        PersistencePolicies.CONTAINER);
-
-    operations.mknode(RegistryPathUtils.parentOf(path), true);
-    operations.bind(path, written, BindFlags.CREATE);
-    ServiceRecord resolved = operations.resolve(path);
-    validateEntry(resolved);
-    assertMatches(written, resolved);
-  }
-
-  /**
-   * Create a complex example app
-   * @throws Throwable
-   */
-  @Test
-  public void testCreateComplexApplication() throws Throwable {
-    String appId = "application_1408631738011_0001";
-    String cid = "container_1408631738011_0001_01_";
-    String cid1 = cid + "000001";
-    String cid2 = cid + "000002";
-    String appPath = USERPATH + "tomcat";
-
-    ServiceRecord webapp = createRecord(appId,
-        PersistencePolicies.APPLICATION, "tomcat-based web application",
-        null);
-    webapp.addExternalEndpoint(restEndpoint("www",
-        new URI("http", "//loadbalancer/", null)));
-
-    ServiceRecord comp1 = createRecord(cid1, PersistencePolicies.CONTAINER,
-        null,
-        null);
-    comp1.addExternalEndpoint(restEndpoint("www",
-        new URI("http", "//rack4server3:43572", null)));
-    comp1.addInternalEndpoint(
-        inetAddrEndpoint("jmx", "JMX", "rack4server3", 43573));
-
-    // Component 2 has a container lifespan
-    ServiceRecord comp2 = createRecord(cid2, PersistencePolicies.CONTAINER,
-        null,
-        null);
-    comp2.addExternalEndpoint(restEndpoint("www",
-        new URI("http", "//rack1server28:35881", null)));
-    comp2.addInternalEndpoint(
-        inetAddrEndpoint("jmx", "JMX", "rack1server28", 35882));
-
-    operations.mknode(USERPATH, false);
-    operations.bind(appPath, webapp, BindFlags.OVERWRITE);
-    String componentsPath = appPath + RegistryConstants.SUBPATH_COMPONENTS;
-    operations.mknode(componentsPath, false);
-    String dns1 = RegistryPathUtils.encodeYarnID(cid1);
-    String dns1path = componentsPath + dns1;
-    operations.bind(dns1path, comp1, BindFlags.CREATE);
-    String dns2 = RegistryPathUtils.encodeYarnID(cid2);
-    String dns2path = componentsPath + dns2;
-    operations.bind(dns2path, comp2, BindFlags.CREATE);
-
-    ZKPathDumper pathDumper = registry.dumpPath(false);
-    LOG.info(pathDumper.toString());
-
-    logRecord("tomcat", webapp);
-    logRecord(dns1, comp1);
-    logRecord(dns2, comp2);
-
-    ServiceRecord dns1resolved = operations.resolve(dns1path);
-    assertEquals("Persistence policies on resolved entry",
-        PersistencePolicies.CONTAINER,
-        dns1resolved.get(YarnRegistryAttributes.YARN_PERSISTENCE, ""));
-
-    Map<String, RegistryPathStatus> children =
-        RegistryUtils.statChildren(operations, componentsPath);
-    assertEquals(2, children.size());
-    Collection<RegistryPathStatus>
-        componentStats = children.values();
-    Map<String, ServiceRecord> records =
-        RegistryUtils.extractServiceRecords(operations,
-            componentsPath, componentStats);
-    assertEquals(2, records.size());
-    ServiceRecord retrieved1 = records.get(dns1path);
-    logRecord(retrieved1.get(YarnRegistryAttributes.YARN_ID, ""), retrieved1);
-    assertMatches(dns1resolved, retrieved1);
-    assertEquals(PersistencePolicies.CONTAINER,
-        retrieved1.get(YarnRegistryAttributes.YARN_PERSISTENCE, ""));
-
-    // create a listing under components/
-    operations.mknode(componentsPath + "subdir", false);
-
-    // this shows up in the listing of child entries
-    Map<String, RegistryPathStatus> childrenUpdated =
-        RegistryUtils.statChildren(operations, componentsPath);
-    assertEquals(3, childrenUpdated.size());
-
-    // the non-record child this is not picked up in the record listing
-    Map<String, ServiceRecord> recordsUpdated =
-
-        RegistryUtils.extractServiceRecords(operations,
-            componentsPath,
-            childrenUpdated);
-    assertEquals(2, recordsUpdated.size());
-
-    // now do some deletions.
-
-    // synchronous delete container ID 2
-
-    // fail if the app policy is chosen
-    assertEquals(0, purge("/", cid2, PersistencePolicies.APPLICATION,
-        RegistryAdminService.PurgePolicy.FailOnChildren));
-    // succeed for container
-    assertEquals(1, purge("/", cid2, PersistencePolicies.CONTAINER,
-        RegistryAdminService.PurgePolicy.FailOnChildren));
-    assertPathNotFound(dns2path);
-    assertPathExists(dns1path);
-
-    // expect a skip on children to skip
-    assertEquals(0,
-        purge("/", appId, PersistencePolicies.APPLICATION,
-            RegistryAdminService.PurgePolicy.SkipOnChildren));
-    assertPathExists(appPath);
-    assertPathExists(dns1path);
-
-    // attempt to delete app with policy of fail on children
-    try {
-      int p = purge("/",
-          appId,
-          PersistencePolicies.APPLICATION,
-          RegistryAdminService.PurgePolicy.FailOnChildren);
-      fail("expected a failure, got a purge count of " + p);
-    } catch (PathIsNotEmptyDirectoryException expected) {
-      // expected
-    }
-    assertPathExists(appPath);
-    assertPathExists(dns1path);
-
-
-    // now trigger recursive delete
-    assertEquals(1,
-        purge("/", appId, PersistencePolicies.APPLICATION,
-            RegistryAdminService.PurgePolicy.PurgeAll));
-    assertPathNotFound(appPath);
-    assertPathNotFound(dns1path);
-
-  }
-
-  @Test
-  public void testChildDeletion() throws Throwable {
-    ServiceRecord app = createRecord("app1",
-        PersistencePolicies.APPLICATION, "app",
-        null);
-    ServiceRecord container = createRecord("container1",
-        PersistencePolicies.CONTAINER, "container",
-        null);
-
-    operations.bind("/app", app, BindFlags.OVERWRITE);
-    operations.bind("/app/container", container, BindFlags.OVERWRITE);
-
-    try {
-      int p = purge("/",
-          "app1",
-          PersistencePolicies.APPLICATION,
-          RegistryAdminService.PurgePolicy.FailOnChildren);
-      fail("expected a failure, got a purge count of " + p);
-    } catch (PathIsNotEmptyDirectoryException expected) {
-      // expected
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/secure/TestSecureRMRegistryOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/secure/TestSecureRMRegistryOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/secure/TestSecureRMRegistryOperations.java
deleted file mode 100644
index 41760d6..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/test/java/org/apache/hadoop/registry/secure/TestSecureRMRegistryOperations.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.registry.secure;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.PathPermissionException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.ServiceStateException;
-import org.apache.hadoop.registry.client.api.RegistryConstants;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
-import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
-import org.apache.hadoop.registry.client.impl.zk.ZKPathDumper;
-import org.apache.hadoop.registry.client.impl.RegistryOperationsClient;
-import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
-import org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions;
-import org.apache.hadoop.registry.server.integration.RMRegistryOperationsService;
-import org.apache.hadoop.registry.server.services.RegistryAdminService;
-import org.apache.zookeeper.client.ZooKeeperSaslClient;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Id;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.List;
-
-import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
-
-/**
- * Verify that the {@link RMRegistryOperationsService} works securely
- */
-public class TestSecureRMRegistryOperations extends AbstractSecureRegistryTest {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(TestSecureRMRegistryOperations.class);
-  private Configuration secureConf;
-  private Configuration zkClientConf;
-  private UserGroupInformation zookeeperUGI;
-
-  @Before
-  public void setupTestSecureRMRegistryOperations() throws Exception {
-    startSecureZK();
-    secureConf = new Configuration();
-    secureConf.setBoolean(KEY_REGISTRY_SECURE, true);
-
-    // create client conf containing the ZK quorum
-    zkClientConf = new Configuration(secureZK.getConfig());
-    zkClientConf.setBoolean(KEY_REGISTRY_SECURE, true);
-    assertNotEmpty(zkClientConf.get(RegistryConstants.KEY_REGISTRY_ZK_QUORUM));
-
-    // ZK is in charge
-    secureConf.set(KEY_REGISTRY_SYSTEM_ACCOUNTS, "sasl:zookeeper@");
-    zookeeperUGI = loginUGI(ZOOKEEPER, keytab_zk);
-  }
-
-  @After
-  public void teardownTestSecureRMRegistryOperations() {
-  }
-
-  /**
-   * Create the RM registry operations as the current user
-   * @return the service
-   * @throws LoginException
-   * @throws FileNotFoundException
-   */
-  public RMRegistryOperationsService startRMRegistryOperations() throws
-      LoginException, IOException, InterruptedException {
-    // kerberos
-    secureConf.set(KEY_REGISTRY_CLIENT_AUTH,
-        REGISTRY_CLIENT_AUTH_KERBEROS);
-    secureConf.set(KEY_REGISTRY_CLIENT_JAAS_CONTEXT, ZOOKEEPER_CLIENT_CONTEXT);
-
-    RMRegistryOperationsService registryOperations = zookeeperUGI.doAs(
-        new PrivilegedExceptionAction<RMRegistryOperationsService>() {
-          @Override
-          public RMRegistryOperationsService run() throws Exception {
-            RMRegistryOperationsService operations
-                = new RMRegistryOperationsService("rmregistry", secureZK);
-            addToTeardown(operations);
-            operations.init(secureConf);
-            LOG.info(operations.bindingDiagnosticDetails());
-            operations.start();
-            return operations;
-          }
-        });
-
-    return registryOperations;
-  }
-
-  /**
-   * test that ZK can write as itself
-   * @throws Throwable
-   */
-  @Test
-  public void testZookeeperCanWriteUnderSystem() throws Throwable {
-
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    RegistryOperations operations = rmRegistryOperations;
-    operations.mknode(PATH_SYSTEM_SERVICES + "hdfs",
-        false);
-    ZKPathDumper pathDumper = rmRegistryOperations.dumpPath(true);
-    LOG.info(pathDumper.toString());
-  }
-
-  @Test
-  public void testAnonReadAccess() throws Throwable {
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    describe(LOG, "testAnonReadAccess");
-    RegistryOperations operations =
-        RegistryOperationsFactory.createAnonymousInstance(zkClientConf);
-    addToTeardown(operations);
-    operations.start();
-
-    assertFalse("RegistrySecurity.isClientSASLEnabled()==true",
-        RegistrySecurity.isClientSASLEnabled());
-    operations.list(PATH_SYSTEM_SERVICES);
-  }
-
-  @Test
-  public void testAnonNoWriteAccess() throws Throwable {
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    describe(LOG, "testAnonNoWriteAccess");
-    RegistryOperations operations =
-        RegistryOperationsFactory.createAnonymousInstance(zkClientConf);
-    addToTeardown(operations);
-    operations.start();
-
-    String servicePath = PATH_SYSTEM_SERVICES + "hdfs";
-    expectMkNodeFailure(operations, servicePath);
-  }
-
-  @Test
-  public void testAnonNoWriteAccessOffRoot() throws Throwable {
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    describe(LOG, "testAnonNoWriteAccessOffRoot");
-    RegistryOperations operations =
-        RegistryOperationsFactory.createAnonymousInstance(zkClientConf);
-    addToTeardown(operations);
-    operations.start();
-    assertFalse("mknode(/)", operations.mknode("/", false));
-    expectMkNodeFailure(operations, "/sub");
-    expectDeleteFailure(operations, PATH_SYSTEM_SERVICES, true);
-  }
-
-  /**
-   * Expect a mknode operation to fail
-   * @param operations operations instance
-   * @param path path
-   * @throws IOException An IO failure other than those permitted
-   */
-  public void expectMkNodeFailure(RegistryOperations operations,
-      String path) throws IOException {
-    try {
-      operations.mknode(path, false);
-      fail("should have failed to create a node under " + path);
-    } catch (PathPermissionException expected) {
-      // expected
-    } catch (NoPathPermissionsException expected) {
-      // expected
-    }
-  }
-
-  /**
-   * Expect a delete operation to fail
-   * @param operations operations instance
-   * @param path path
-   * @param recursive
-   * @throws IOException An IO failure other than those permitted
-   */
-  public void expectDeleteFailure(RegistryOperations operations,
-      String path, boolean recursive) throws IOException {
-    try {
-      operations.delete(path, recursive);
-      fail("should have failed to delete the node " + path);
-    } catch (PathPermissionException expected) {
-      // expected
-    } catch (NoPathPermissionsException expected) {
-      // expected
-    }
-  }
-
-  @Test
-  public void testAlicePathRestrictedAnonAccess() throws Throwable {
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    String aliceHome = rmRegistryOperations.initUserRegistry(ALICE);
-    describe(LOG, "Creating anonymous accessor");
-    RegistryOperations anonOperations =
-        RegistryOperationsFactory.createAnonymousInstance(zkClientConf);
-    addToTeardown(anonOperations);
-    anonOperations.start();
-    anonOperations.list(aliceHome);
-    expectMkNodeFailure(anonOperations, aliceHome + "/anon");
-    expectDeleteFailure(anonOperations, aliceHome, true);
-  }
-
-  @Test
-  public void testUserZookeeperHomePathAccess() throws Throwable {
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    final String home = rmRegistryOperations.initUserRegistry(ZOOKEEPER);
-    describe(LOG, "Creating ZK client");
-
-    RegistryOperations operations = zookeeperUGI.doAs(
-        new PrivilegedExceptionAction<RegistryOperations>() {
-          @Override
-          public RegistryOperations run() throws Exception {
-            RegistryOperations operations =
-                RegistryOperationsFactory.createKerberosInstance(zkClientConf,
-                    ZOOKEEPER_CLIENT_CONTEXT);
-            addToTeardown(operations);
-            operations.start();
-
-            return operations;
-          }
-        });
-    operations.list(home);
-    String path = home + "/subpath";
-    operations.mknode(path, false);
-    operations.delete(path, true);
-  }
-
-  @Test
-  public void testUserHomedirsPermissionsRestricted() throws Throwable {
-    // test that the /users/$user permissions are restricted
-    RMRegistryOperationsService rmRegistryOperations =
-        startRMRegistryOperations();
-    // create Alice's dir, so it should have an ACL for Alice
-    final String home = rmRegistryOperations.initUserRegistry(ALICE);
-    List<ACL> acls = rmRegistryOperations.zkGetACLS(home);
-    ACL aliceACL = null;
-    for (ACL acl : acls) {
-      LOG.info(RegistrySecurity.aclToString(acl));
-      Id id = acl.getId();
-      if (id.getScheme().equals(ZookeeperConfigOptions.SCHEME_SASL)
-          && id.getId().startsWith(ALICE)) {
-
-        aliceACL = acl;
-        break;
-      }
-    }
-    assertNotNull(aliceACL);
-    assertEquals(RegistryAdminService.USER_HOMEDIR_ACL_PERMISSIONS,
-        aliceACL.getPerms());
-  }
-
-  @Test
-  public void testDigestAccess() throws Throwable {
-    RMRegistryOperationsService registryAdmin =
-        startRMRegistryOperations();
-    String id = "username";
-    String pass = "password";
-    registryAdmin.addWriteAccessor(id, pass);
-    List<ACL> clientAcls = registryAdmin.getClientAcls();
-    LOG.info("Client ACLS=\n{}", RegistrySecurity.aclsToString(clientAcls));
-
-    String base = "/digested";
-    registryAdmin.mknode(base, false);
-    List<ACL> baseACLs = registryAdmin.zkGetACLS(base);
-    String aclset = RegistrySecurity.aclsToString(baseACLs);
-    LOG.info("Base ACLs=\n{}", aclset);
-    ACL found = null;
-    for (ACL acl : baseACLs) {
-      if (ZookeeperConfigOptions.SCHEME_DIGEST.equals(acl.getId().getScheme())) {
-        found = acl;
-        break;
-      }
-    }
-    assertNotNull("Did not find digest entry in ACLs " + aclset, found);
-    zkClientConf.set(KEY_REGISTRY_USER_ACCOUNTS,
-        "sasl:somebody@EXAMPLE.COM, sasl:other");
-    RegistryOperations operations =
-        RegistryOperationsFactory.createAuthenticatedInstance(zkClientConf,
-            id,
-            pass);
-    addToTeardown(operations);
-    operations.start();
-    RegistryOperationsClient operationsClient =
-        (RegistryOperationsClient) operations;
-    List<ACL> digestClientACLs = operationsClient.getClientAcls();
-    LOG.info("digest client ACLs=\n{}",
-        RegistrySecurity.aclsToString(digestClientACLs));
-    operations.stat(base);
-    operations.mknode(base + "/subdir", false);
-    ZKPathDumper pathDumper = registryAdmin.dumpPath(true);
-    LOG.info(pathDumper.toString());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNoDigestAuthMissingId() throws Throwable {
-    RegistryOperationsFactory.createAuthenticatedInstance(zkClientConf,
-        "",
-        "pass");
-  }
-
-  @Test(expected = ServiceStateException.class)
-  public void testNoDigestAuthMissingId2() throws Throwable {
-    zkClientConf.set(KEY_REGISTRY_CLIENT_AUTH, REGISTRY_CLIENT_AUTH_DIGEST);
-    zkClientConf.set(KEY_REGISTRY_CLIENT_AUTHENTICATION_ID, "");
-    zkClientConf.set(KEY_REGISTRY_CLIENT_AUTHENTICATION_PASSWORD, "pass");
-    RegistryOperationsFactory.createInstance("DigestRegistryOperations",
-        zkClientConf);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void testNoDigestAuthMissingPass() throws Throwable {
-    RegistryOperationsFactory.createAuthenticatedInstance(zkClientConf,
-        "id",
-        "");
-  }
-
-  @Test(expected = ServiceStateException.class)
-  public void testNoDigestAuthMissingPass2() throws Throwable {
-    zkClientConf.set(KEY_REGISTRY_CLIENT_AUTH, REGISTRY_CLIENT_AUTH_DIGEST);
-    zkClientConf.set(KEY_REGISTRY_CLIENT_AUTHENTICATION_ID, "id");
-    zkClientConf.set(KEY_REGISTRY_CLIENT_AUTHENTICATION_PASSWORD, "");
-    RegistryOperationsFactory.createInstance("DigestRegistryOperations",
-        zkClientConf);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/QuickStart.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/QuickStart.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/QuickStart.md
index 512c011..f13d7d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/QuickStart.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/QuickStart.md
@@ -208,7 +208,25 @@ If you are building from source code, make sure you use `-Pyarn-ui` in the `mvn`
   </property>
 ```
 
-# Try with Docker
+# Run with security
+YARN service framework supports running in a secure(kerberized) environment. User needs to specify the kerberos principal name and keytab when they launch the service.
+E.g. A typical configuration looks like below:
+```
+{
+  "name": "sample-service",
+  ...
+  ...
+  "kerberos_principal" : {
+    "principal_name" : "hdfs-demo@EXAMPLE.COM",
+    "keytab" : "hdfs:///etc/security/keytabs/hdfs.headless.keytab"
+  }
+}
+```
+* principal_name : the principal name of the user who launches the service
+* keytab : URI of the keytab. It supports two modes:
+    * URI starts with `hdfs://`: The URI where the keytab is stored on hdfs. The keytab will be localized to each node by YARN.
+    * URI starts with `file://`: The URI where the keytab is stored on local host. It is assumed that admin pre-installs the keytabs on the local host before AM launches.
+# Run with Docker
 The above example is only for a non-docker container based service. YARN Service Framework also provides first-class support for managing docker based services.
 Most of the steps for managing docker based services are the same except that in docker the `Artifact` type for a component is `DOCKER` and the Artifact `id` is the name of the docker image.
 For details in how to setup docker on YARN, please check [Docker on YARN](../DockerContainers.md).

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md
index c0e12c7..e224e5d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/yarn-service/YarnServiceAPI.md
@@ -291,6 +291,15 @@ The current state of the container of a service.
 |----|----|----|----|----|
 |state|enum of the state of the container|false|enum (INIT, STARTED, READY)||
 
+### KerberosPrincipal
+
+The kerberos principal info of the user who launches the service.
+
+|Name|Description|Required|Schema|Default|
+|----|----|----|----|----|
+|principal_name|The principal name of the user who launches the service.|false|string||
+|keytab|The URI of the kerberos keytab. It supports two modes, URI starts with "hdfs://": A path on hdfs where the keytab is stored. The keytab will be localized by YARN to each host; URI starts with "file://": A path on the local host where the keytab is stored. It is assumed that the keytabs are pre-installed by admins before AM launches.|false|string||
+
 
 ### PlacementPolicy
 
@@ -342,7 +351,7 @@ a service resource has the following attributes.
 |state|State of the service. Specifying a value for this attribute for the PUT payload means update the service to this desired state.|false|ServiceState||
 |quicklinks|A blob of key-value pairs of quicklinks to be exported for a service.|false|object||
 |queue|The YARN queue that this service should be submitted to.|false|string||
-
+|kerberos_principal | The principal info of the user who launches the service|false||
 
 ### ServiceState
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-6669. Implemented Kerberos security for YARN service framework. (Contributed by Jian He)

Posted by ey...@apache.org.
YARN-6669.  Implemented Kerberos security for YARN service framework.  (Contributed by Jian He)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d30d5782
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d30d5782
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d30d5782

Branch: refs/heads/trunk
Commit: d30d57828fddaa8667de49af879cde999907c7f6
Parents: 404eab4
Author: Eric Yang <ey...@apache.org>
Authored: Mon Dec 4 15:11:00 2017 -0500
Committer: Eric Yang <ey...@apache.org>
Committed: Mon Dec 4 15:11:00 2017 -0500

----------------------------------------------------------------------
 .../hadoop/yarn/service/webapp/ApiServer.java   |   1 +
 ...RN-Simplified-V1-API-Layer-For-Services.yaml |  16 +
 .../dev-support/findbugs-exclude.xml            |   5 +-
 .../yarn/service/ClientAMPolicyProvider.java    |  39 ++
 .../yarn/service/ClientAMSecurityInfo.java      |  62 ++++
 .../hadoop/yarn/service/ClientAMService.java    |   9 +
 .../hadoop/yarn/service/ServiceContext.java     |   8 +
 .../hadoop/yarn/service/ServiceMaster.java      | 140 ++++++-
 .../hadoop/yarn/service/ServiceScheduler.java   |  30 +-
 .../service/api/records/KerberosPrincipal.java  | 146 ++++++++
 .../yarn/service/api/records/Service.java       |  23 ++
 .../yarn/service/client/ServiceClient.java      | 174 ++++++---
 .../yarn/service/component/Component.java       |   2 +-
 .../yarn/service/conf/YarnServiceConf.java      |   7 -
 .../yarn/service/conf/YarnServiceConstants.java |   3 +
 .../containerlaunch/AbstractLauncher.java       |  39 +-
 .../containerlaunch/ContainerLaunchService.java |  10 +-
 .../containerlaunch/CredentialUtils.java        | 319 ----------------
 .../hadoop/yarn/service/package-info.java       |  24 ++
 .../yarn/service/provider/ProviderUtils.java    |  53 +--
 .../yarn/service/utils/ServiceApiUtil.java      |  15 +
 .../hadoop/yarn/service/utils/ServiceUtils.java |  31 +-
 .../org.apache.hadoop.security.SecurityInfo     |  14 +
 .../client/api/RegistryOperationsFactory.java   |  21 ++
 .../registry/client/impl/zk/CuratorService.java |   8 +-
 .../client/impl/zk/RegistrySecurity.java        |  96 ++++-
 .../hadoop/registry/server/dns/RegistryDNS.java |   4 +
 .../RMRegistryOperationsService.java            | 246 -------------
 .../services/DeleteCompletionCallback.java      |   3 +-
 .../hadoop/registry/AbstractRegistryTest.java   |  15 +-
 .../integration/TestRegistryRMOperations.java   | 369 -------------------
 .../secure/TestSecureRMRegistryOperations.java  | 348 -----------------
 .../site/markdown/yarn-service/QuickStart.md    |  20 +-
 .../markdown/yarn-service/YarnServiceAPI.md     |  11 +-
 34 files changed, 844 insertions(+), 1467 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
index 1bb6c93..34ab8f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/java/org/apache/hadoop/yarn/service/webapp/ApiServer.java
@@ -173,6 +173,7 @@ public class ApiServer {
       return Response.status(Status.BAD_REQUEST).entity(serviceStatus)
           .build();
     } catch (Exception e) {
+      LOG.error("Fail to stop service:", e);
       ServiceStatus serviceStatus = new ServiceStatus();
       serviceStatus.setDiagnostics(e.getMessage());
       return Response.status(Status.INTERNAL_SERVER_ERROR)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
index 088b50c..979883c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services-api/src/main/resources/definition/YARN-Simplified-V1-API-Layer-For-Services.yaml
@@ -244,6 +244,10 @@ definitions:
       queue:
         type: string
         description: The YARN queue that this service should be submitted to.
+      kerberos_principal:
+        description: The Kerberos Principal of the service
+        $ref: '#/definitions/KerberosPrincipal'
+
   Resource:
     description:
       Resource determines the amount of resources (vcores, memory, network, etc.) usable by a container. This field determines the resource to be applied for all the containers of a component or service. The resource specified at the service (or global) level can be overriden at the component level. Only one of profile OR cpu & memory are expected. It raises a validation exception otherwise.
@@ -469,3 +473,15 @@ definitions:
         type: integer
         format: int32
         description: An error code specific to a scenario which service owners should be able to use to understand the failure in addition to the diagnostic information.
+  KerberosPrincipal:
+    description: The kerberos principal info of the user who launches the service.
+    properties:
+      principal_name:
+        type: string
+        description: The principal name of the user who launches the service.
+      keytab:
+        type: string
+        description: |
+          The URI of the kerberos keytab. It supports two modes:
+          URI starts with "hdfs://": A path on hdfs where the keytab is stored. The keytab will be localized by YARN to each host.
+          URI starts with "file://": A path on the local host where the keytab is stored. It is assumed that the keytabs are pre-installed by admins before AM launches.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
index 2814cca..80c04c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/dev-support/findbugs-exclude.xml
@@ -44,5 +44,8 @@
         <Field name="registryClient" />
         <Bug pattern="IS2_INCONSISTENT_SYNC"/>
     </Match>
-
+    <Match>
+        <Class name="org.apache.hadoop.yarn.service.ClientAMPolicyProvider"/>
+        <Bug pattern="EI_EXPOSE_REP"/>
+    </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
new file mode 100644
index 0000000..365df0f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMPolicyProvider.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.security.authorize.Service;
+
+/**
+ * PolicyProvider for Client to Service AM protocol.
+ */
+public class ClientAMPolicyProvider extends PolicyProvider {
+
+  private static final Service[] CLIENT_AM_SERVICE =
+      new Service[]{
+          new Service(
+              "security.yarn-service.client-am-protocol.acl",
+              ClientAMProtocol.class)};
+
+  @Override
+  public Service[] getServices() {
+    return CLIENT_AM_SERVICE;
+  };
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
new file mode 100644
index 0000000..e19284b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMSecurityInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.KerberosInfo;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.token.TokenInfo;
+import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
+import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
+
+import java.lang.annotation.Annotation;
+
+/**
+ * Security Info for Client to Service AM protocol.
+ */
+public class ClientAMSecurityInfo extends SecurityInfo{
+  @Override
+  public KerberosInfo getKerberosInfo(Class<?> protocol, Configuration conf) {
+    if (!protocol.equals(ClientAMProtocolPB.class)) {
+      return null;
+    }
+    return new KerberosInfo() {
+
+      @Override
+      public Class<? extends Annotation> annotationType() {
+        return null;
+      }
+
+      @Override
+      public String serverPrincipal() {
+        return YarnServiceConstants.PRINCIPAL;
+      }
+
+      @Override
+      public String clientPrincipal() {
+        return null;
+      }
+    };
+  }
+
+  @Override
+  public TokenInfo getTokenInfo(Class<?> protocol, Configuration conf) {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
index 8e4c34d..94dd8d6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ClientAMService.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.service;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.service.AbstractService;
@@ -65,6 +66,14 @@ public class ClientAMService extends AbstractService
     InetSocketAddress address = new InetSocketAddress(0);
     server = rpc.getServer(ClientAMProtocol.class, this, address, conf,
         context.secretManager, 1);
+
+    // Enable service authorization?
+    if (conf.getBoolean(
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+        false)) {
+      this.server.refreshServiceAcl(getConfig(), new ClientAMPolicyProvider());
+    }
+
     server.start();
 
     String nodeHostString =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
index 94dbc6e..cd41ab7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceContext.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 
+import java.nio.ByteBuffer;
+
 public class ServiceContext {
   public Service service = null;
   public SliderFileSystem fs;
@@ -34,6 +36,12 @@ public class ServiceContext {
   public ServiceScheduler scheduler;
   public ClientToAMTokenSecretManager secretManager;
   public ClientAMService clientAMService;
+  // tokens used for container launch
+  public ByteBuffer tokens;
+  // AM keytab principal
+  public String principal;
+  // AM keytab location
+  public String keytab;
 
   public ServiceContext() {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
index b0b4f06..1283604 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceMaster.java
@@ -20,33 +20,49 @@ package org.apache.hadoop.yarn.service;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
+import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
 import org.apache.hadoop.yarn.service.monitor.ServiceMonitor;
 import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
-import org.apache.hadoop.yarn.service.exceptions.BadClusterStateException;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedExceptionAction;
+import java.util.Iterator;
 import java.util.Map;
 
+import static org.apache.hadoop.yarn.service.conf.YarnServiceConstants.KEYTAB_LOCATION;
+
 public class ServiceMaster extends CompositeService {
 
   private static final Logger LOG =
@@ -63,13 +79,7 @@ public class ServiceMaster extends CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    //TODO Deprecate slider conf, make sure works with yarn conf
     printSystemEnv();
-    if (UserGroupInformation.isSecurityEnabled()) {
-      UserGroupInformation.setConfiguration(conf);
-    }
-    LOG.info("Login user is {}", UserGroupInformation.getLoginUser());
-
     context = new ServiceContext();
     Path appDir = getAppDir();
     context.serviceHdfsDir = appDir.toString();
@@ -78,6 +88,10 @@ public class ServiceMaster extends CompositeService {
     fs.setAppDir(appDir);
     loadApplicationJson(context, fs);
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      context.tokens = recordTokensForContainers();
+      doSecureLogin();
+    }
     // Take yarn config from YarnFile and merge them into YarnConfiguration
     for (Map.Entry<String, String> entry : context.service
         .getConfiguration().getProperties().entrySet()) {
@@ -111,6 +125,100 @@ public class ServiceMaster extends CompositeService {
     super.serviceInit(conf);
   }
 
+  // Record the tokens and use them for launching containers.
+  // e.g. localization requires the hdfs delegation tokens
+  private ByteBuffer recordTokensForContainers() throws IOException {
+    Credentials copy = new Credentials(UserGroupInformation.getCurrentUser()
+        .getCredentials());
+    DataOutputBuffer dob = new DataOutputBuffer();
+    try {
+      copy.writeTokenStorageToStream(dob);
+    } finally {
+      dob.close();
+    }
+    // Now remove the AM->RM token so that task containers cannot access it.
+    Iterator<Token<?>> iter = copy.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<?> token = iter.next();
+      LOG.info(token.toString());
+      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
+        iter.remove();
+      }
+    }
+    return ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+  }
+
+  // 1. First try to use user specified keytabs
+  // 2. If not specified, then try to use pre-installed keytab at localhost
+  // 3. strip off hdfs delegation tokens to ensure use keytab to talk to hdfs
+  private void doSecureLogin()
+      throws IOException, URISyntaxException {
+    // read the localized keytab specified by user
+    File keytab = new File(String.format(KEYTAB_LOCATION,
+        context.service.getName()));
+    if (!keytab.exists()) {
+      LOG.info("No keytab localized at " + keytab);
+      // Check if there exists a pre-installed keytab at host
+      String preInstalledKeytab = context.service.getKerberosPrincipal()
+          .getKeytab();
+      if (!StringUtils.isEmpty(preInstalledKeytab)) {
+        URI uri = new URI(preInstalledKeytab);
+        if (uri.getScheme().equals("file")) {
+          keytab = new File(uri);
+          LOG.info("Using pre-installed keytab from localhost: " +
+              preInstalledKeytab);
+        }
+      }
+    }
+    if (!keytab.exists()) {
+      LOG.info("No keytab exists: " + keytab);
+      return;
+    }
+    String principal = context.service.getKerberosPrincipal()
+        .getPrincipalName();
+    if (StringUtils.isEmpty((principal))) {
+      principal = UserGroupInformation.getLoginUser().getShortUserName();
+      LOG.info("No principal name specified.  Will use AM " +
+          "login identity {} to attempt keytab-based login", principal);
+    }
+
+    Credentials credentials = UserGroupInformation.getCurrentUser()
+        .getCredentials();
+    LOG.info("User before logged in is: " + UserGroupInformation
+        .getCurrentUser());
+    String principalName = SecurityUtil.getServerPrincipal(principal,
+        ServiceUtils.getLocalHostName(getConfig()));
+    UserGroupInformation.loginUserFromKeytab(principalName,
+        keytab.getAbsolutePath());
+    // add back the credentials
+    UserGroupInformation.getCurrentUser().addCredentials(credentials);
+    LOG.info("User after logged in is: " + UserGroupInformation
+        .getCurrentUser());
+    context.principal = principalName;
+    context.keytab = keytab.getAbsolutePath();
+    removeHdfsDelegationToken(UserGroupInformation.getLoginUser());
+  }
+
+  // Remove HDFS delegation token from login user and ensure AM to use keytab
+  // to talk to hdfs
+  private static void removeHdfsDelegationToken(UserGroupInformation user) {
+    if (!user.isFromKeytab()) {
+      LOG.error("AM is not holding on a keytab in a secure deployment:" +
+          " service will fail when tokens expire");
+    }
+    Credentials credentials = user.getCredentials();
+    Iterator<Token<? extends TokenIdentifier>> iter =
+        credentials.getAllTokens().iterator();
+    while (iter.hasNext()) {
+      Token<? extends TokenIdentifier> token = iter.next();
+      if (token.getKind().equals(
+          DelegationTokenIdentifier.HDFS_DELEGATION_KIND)) {
+        LOG.info("Remove HDFS delegation token {}.", token);
+        iter.remove();
+      }
+    }
+  }
+
   protected ContainerId getAMContainerId() throws BadClusterStateException {
     return ContainerId.fromString(ServiceUtils.mandatoryEnvVariable(
         ApplicationConstants.Environment.CONTAINER_ID.name()));
@@ -133,6 +241,17 @@ public class ServiceMaster extends CompositeService {
   }
 
   @Override
+  protected void serviceStart() throws Exception {
+    LOG.info("Starting service as user " + UserGroupInformation
+        .getCurrentUser());
+    UserGroupInformation.getLoginUser().doAs(
+        (PrivilegedExceptionAction<Void>) () -> {
+          super.serviceStart();
+          return null;
+        }
+    );
+  }
+  @Override
   protected void serviceStop() throws Exception {
     LOG.info("Stopping app master");
     super.serviceStop();
@@ -146,7 +265,8 @@ public class ServiceMaster extends CompositeService {
 
   public static void main(String[] args) throws Exception {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
-    StringUtils.startupShutdownMessage(ServiceMaster.class, args, LOG);
+    org.apache.hadoop.util.StringUtils
+        .startupShutdownMessage(ServiceMaster.class, args, LOG);
     try {
       ServiceMaster serviceMaster = new ServiceMaster("Service Master");
       ShutdownHookManager.get()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
index 6bc5673..bea31cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/ServiceScheduler.java
@@ -22,6 +22,7 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,6 +35,7 @@ import org.apache.hadoop.registry.client.binding.RegistryUtils;
 import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
 import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes;
+import org.apache.hadoop.security.HadoopKerberosName;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@@ -142,11 +144,29 @@ public class ServiceScheduler extends CompositeService {
   }
 
   public void buildInstance(ServiceContext context, Configuration configuration)
-      throws YarnException {
+      throws YarnException, IOException {
     app = context.service;
     executorService = Executors.newScheduledThreadPool(10);
-    RegistryOperations registryClient = RegistryOperationsFactory
-        .createInstance("ServiceScheduler", configuration);
+    RegistryOperations registryClient = null;
+    if (UserGroupInformation.isSecurityEnabled() &&
+        !StringUtils.isEmpty(context.principal)
+        && !StringUtils.isEmpty(context.keytab)) {
+      Configuration conf = getConfig();
+      // Only take the first section of the principal
+      // e.g. hdfs-demo@EXAMPLE.COM will take hdfs-demo
+      // This is because somehow zookeeper client only uses the first section
+      // for acl validations.
+      String username = new HadoopKerberosName(context.principal.trim())
+          .getServiceName();
+      LOG.info("Set registry user accounts: sasl:" + username);
+      conf.set(KEY_REGISTRY_USER_ACCOUNTS, "sasl:" + username);
+      registryClient = RegistryOperationsFactory
+          .createKerberosInstance(conf,
+              "Client", context.principal, context.keytab);
+    } else {
+      registryClient = RegistryOperationsFactory
+          .createInstance("ServiceScheduler", configuration);
+    }
     addIfService(registryClient);
     yarnRegistryOperations =
         createYarnRegistryOperations(context, registryClient);
@@ -171,7 +191,7 @@ public class ServiceScheduler extends CompositeService {
     dispatcher.setDrainEventsOnStop();
     addIfService(dispatcher);
 
-    containerLaunchService = new ContainerLaunchService(context.fs);
+    containerLaunchService = new ContainerLaunchService(context);
     addService(containerLaunchService);
 
     if (YarnConfiguration.timelineServiceV2Enabled(configuration)) {
@@ -408,7 +428,7 @@ public class ServiceScheduler extends CompositeService {
           }
         } catch (IOException e) {
           LOG.error(
-              "Failed to register app " + app.getName() + " in registry");
+              "Failed to register app " + app.getName() + " in registry", e);
         }
       }
     });

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
new file mode 100644
index 0000000..e38fdb5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/KerberosPrincipal.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.service.api.records;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlElement;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * The kerberos principal of the service.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+@ApiModel(description = "The kerberos principal of the service.")
+@javax.annotation.Generated(value = "io.swagger.codegen.languages" +
+    ".JavaClientCodegen", date = "2017-11-20T11:29:11.785-08:00")
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class KerberosPrincipal implements Serializable {
+  private static final long serialVersionUID = -6431667195287650037L;
+
+  @JsonProperty("principal_name")
+  @XmlElement(name = "principal_name")
+  private String principalName = null;
+
+  @JsonProperty("keytab")
+  @XmlElement(name = "keytab")
+  private String keytab = null;
+
+  public KerberosPrincipal principalName(String principalName) {
+    this.principalName = principalName;
+    return this;
+  }
+
+  /**
+   * The principal name of the service.
+   *
+   * @return principalName
+   **/
+  @ApiModelProperty(value = "The principal name of the service.")
+  public String getPrincipalName() {
+    return principalName;
+  }
+
+  public void setPrincipalName(String principalName) {
+    this.principalName = principalName;
+  }
+
+  public KerberosPrincipal keytab(String keytab) {
+    this.keytab = keytab;
+    return this;
+  }
+
+  /**
+   * The URI of the kerberos keytab. It supports two schemes \&quot;
+   * hdfs\&quot; and \&quot;file\&quot;. If the URI starts with \&quot;
+   * hdfs://\&quot; scheme, it indicates the path on hdfs where the keytab is
+   * stored. The keytab will be localized by YARN and made available to AM in
+   * its local directory. If the URI starts with \&quot;file://\&quot;
+   * scheme, it indicates a path on the local host presumbaly installed by
+   * admins upfront.
+   *
+   * @return keytab
+   **/
+  @ApiModelProperty(value = "The URI of the kerberos keytab. It supports two " +
+      "schemes \"hdfs\" and \"file\". If the URI starts with \"hdfs://\" " +
+      "scheme, it indicates the path on hdfs where the keytab is stored. The " +
+      "keytab will be localized by YARN and made available to AM in its local" +
+      " directory. If the URI starts with \"file://\" scheme, it indicates a " +
+      "path on the local host where the keytab is presumbaly installed by " +
+      "admins upfront. ")
+  public String getKeytab() {
+    return keytab;
+  }
+
+  public void setKeytab(String keytab) {
+    this.keytab = keytab;
+  }
+
+
+  @Override
+  public boolean equals(java.lang.Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    KerberosPrincipal kerberosPrincipal = (KerberosPrincipal) o;
+    return Objects.equals(this.principalName, kerberosPrincipal
+        .principalName) &&
+        Objects.equals(this.keytab, kerberosPrincipal.keytab);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(principalName, keytab);
+  }
+
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class KerberosPrincipal {\n");
+
+    sb.append("    principalName: ").append(toIndentedString(principalName))
+        .append("\n");
+    sb.append("    keytab: ").append(toIndentedString(keytab)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces
+   * (except the first line).
+   */
+  private String toIndentedString(java.lang.Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
index 8045822..392b71e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/api/records/Service.java
@@ -71,6 +71,9 @@ public class Service extends BaseResource {
   private ServiceState state = null;
   private Map<String, String> quicklinks = new HashMap<>();
   private String queue = null;
+  @JsonProperty("kerberos_principal")
+  @XmlElement(name = "kerberos_principal")
+  private KerberosPrincipal kerberosPrincipal = new KerberosPrincipal();
 
   /**
    * A unique service name.
@@ -335,6 +338,24 @@ public class Service extends BaseResource {
     this.queue = queue;
   }
 
+  public Service kerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
+    this.kerberosPrincipal = kerberosPrincipal;
+    return this;
+  }
+
+  /**
+   * The Kerberos Principal of the service.
+   * @return kerberosPrincipal
+   **/
+  @ApiModelProperty(value = "The Kerberos Principal of the service")
+  public KerberosPrincipal getKerberosPrincipal() {
+    return kerberosPrincipal;
+  }
+
+  public void setKerberosPrincipal(KerberosPrincipal kerberosPrincipal) {
+    this.kerberosPrincipal = kerberosPrincipal;
+  }
+
   @Override
   public boolean equals(java.lang.Object o) {
     if (this == o) {
@@ -376,6 +397,8 @@ public class Service extends BaseResource {
     sb.append("    quicklinks: ").append(toIndentedString(quicklinks))
         .append("\n");
     sb.append("    queue: ").append(toIndentedString(queue)).append("\n");
+    sb.append("    kerberosPrincipal: ")
+        .append(toIndentedString(kerberosPrincipal)).append("\n");
     sb.append("}");
     return sb.toString();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
index d1b6026..81c56d2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/client/ServiceClient.java
@@ -28,12 +28,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.api.RegistryOperations;
 import org.apache.hadoop.registry.client.api.RegistryOperationsFactory;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -43,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.client.api.AppAdminClient;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.client.util.YarnClientUtils;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -79,6 +83,9 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.text.MessageFormat;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
@@ -98,7 +105,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
   //TODO disable retry so that client / rest API doesn't block?
   protected YarnClient yarnClient;
   // Avoid looking up applicationId from fs all the time.
-  private Map<String, ApplicationId> cachedAppIds = new ConcurrentHashMap<>();
+  private Map<String, AppInfo> cachedAppInfo = new ConcurrentHashMap<>();
 
   private RegistryOperations registryClient;
   private CuratorFramework curatorClient;
@@ -210,7 +217,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     // Write the definition first and then submit - AM will read the definition
     createDirAndPersistApp(appDir, service);
     ApplicationId appId = submitApp(service);
-    cachedAppIds.put(serviceName, appId);
+    cachedAppInfo.put(serviceName, new AppInfo(appId, service
+        .getKerberosPrincipal().getPrincipalName()));
     service.setId(appId.toString());
     // update app definition with appId
     persistAppDef(appDir, service);
@@ -224,8 +232,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     Service persistedService =
         ServiceApiUtil.loadService(fs, serviceName);
     if (!StringUtils.isEmpty(persistedService.getId())) {
-      cachedAppIds.put(persistedService.getName(),
-          ApplicationId.fromString(persistedService.getId()));
+      cachedAppInfo.put(persistedService.getName(), new AppInfo(
+          ApplicationId.fromString(persistedService.getId()),
+          persistedService.getKerberosPrincipal().getPrincipalName()));
     } else {
       throw new YarnException(persistedService.getName()
           + " appId is null, may be not submitted to YARN yet");
@@ -278,8 +287,9 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       throw new YarnException(
           serviceName + " appId is null, may be not submitted to YARN yet");
     }
-    cachedAppIds.put(persistedService.getName(),
-        ApplicationId.fromString(persistedService.getId()));
+    cachedAppInfo.put(persistedService.getName(), new AppInfo(
+        ApplicationId.fromString(persistedService.getId()), persistedService
+        .getKerberosPrincipal().getPrincipalName()));
     return flexComponents(serviceName, componentCounts, persistedService);
   }
 
@@ -328,7 +338,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       throw new YarnException(serviceName + " AM hostname is empty");
     }
     ClientAMProtocol proxy =
-        createAMProxy(appReport.getHost(), appReport.getRpcPort());
+        createAMProxy(serviceName, appReport);
     proxy.flexComponents(requestBuilder.build());
     for (Map.Entry<String, Long> entry : original.entrySet()) {
       LOG.info("[COMPONENT {}]: number of containers changed from {} to {}",
@@ -366,8 +376,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     LOG.info("Stopping service {}, with appId = {}", serviceName, currentAppId);
     try {
       ClientAMProtocol proxy =
-          createAMProxy(report.getHost(), report.getRpcPort());
-      cachedAppIds.remove(serviceName);
+          createAMProxy(serviceName, report);
+      cachedAppInfo.remove(serviceName);
       if (proxy != null) {
         // try to stop the app gracefully.
         StopRequestProto request = StopRequestProto.newBuilder().build();
@@ -406,8 +416,8 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
         }
       }
     } catch (IOException | YarnException | InterruptedException e) {
-      LOG.info("Failed to stop " + serviceName
-          + " gracefully, forcefully kill the app.");
+      LOG.info("Failed to stop " + serviceName + " gracefully due to: "
+          + e.getMessage() + ", forcefully kill the app.");
       yarnClient.killApplication(currentAppId, "Forcefully kill the app");
     }
     return EXIT_SUCCESS;
@@ -421,7 +431,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     Path appDir = fs.buildClusterDirPath(serviceName);
     FileSystem fileSystem = fs.getFileSystem();
     // remove from the appId cache
-    cachedAppIds.remove(serviceName);
+    cachedAppInfo.remove(serviceName);
     if (fileSystem.exists(appDir)) {
       if (fileSystem.delete(appDir, true)) {
         LOG.info("Successfully deleted service dir for " + serviceName + ": "
@@ -552,7 +562,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     // copy jars to hdfs and add to localResources
     addJarResource(serviceName, localResources);
     // add keytab if in secure env
-    addKeytabResourceIfSecure(fs, localResources, conf, serviceName);
+    addKeytabResourceIfSecure(fs, localResources, app);
     if (LOG.isDebugEnabled()) {
       printLocalResources(localResources);
     }
@@ -581,6 +591,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     amLaunchContext.setCommands(Collections.singletonList(cmdStr));
     amLaunchContext.setEnvironment(env);
     amLaunchContext.setLocalResources(localResources);
+    addHdfsDelegationTokenIfSecure(amLaunchContext);
     submissionContext.setAMContainerSpec(amLaunchContext);
     yarnClient.submitApplication(submissionContext);
     return submissionContext.getApplicationId();
@@ -771,38 +782,75 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     return appJson;
   }
 
+  private void addHdfsDelegationTokenIfSecure(ContainerLaunchContext amContext)
+      throws IOException {
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      return;
+    }
+    Credentials credentials = new Credentials();
+    String tokenRenewer = YarnClientUtils.getRmPrincipal(getConfig());
+    if (StringUtils.isEmpty(tokenRenewer)) {
+      throw new IOException(
+          "Can't get Master Kerberos principal for the RM to use as renewer");
+    }
+    // Get hdfs dt
+    final org.apache.hadoop.security.token.Token<?>[] tokens =
+        fs.getFileSystem().addDelegationTokens(tokenRenewer, credentials);
+    if (tokens != null && tokens.length != 0) {
+      for (Token<?> token : tokens) {
+        LOG.debug("Got DT: " + token);
+      }
+      DataOutputBuffer dob = new DataOutputBuffer();
+      credentials.writeTokenStorageToStream(dob);
+      ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      amContext.setTokens(fsTokens);
+    }
+  }
+
   private void addKeytabResourceIfSecure(SliderFileSystem fileSystem,
-      Map<String, LocalResource> localResource, Configuration conf,
-      String serviceName) throws IOException, BadConfigException {
+      Map<String, LocalResource> localResource, Service service)
+      throws IOException, YarnException {
     if (!UserGroupInformation.isSecurityEnabled()) {
       return;
     }
-    String keytabPreInstalledOnHost =
-        conf.get(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
-    if (StringUtils.isEmpty(keytabPreInstalledOnHost)) {
-      String amKeytabName =
-          conf.get(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
-      String keytabDir = conf.get(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
-      Path keytabPath =
-          fileSystem.buildKeytabPath(keytabDir, amKeytabName, serviceName);
-      if (fileSystem.getFileSystem().exists(keytabPath)) {
-        LocalResource keytabRes =
-            fileSystem.createAmResource(keytabPath, LocalResourceType.FILE);
-        localResource
-            .put(YarnServiceConstants.KEYTAB_DIR + "/" + amKeytabName, keytabRes);
-        LOG.info("Adding AM keytab on hdfs: " + keytabPath);
-      } else {
-        LOG.warn("No keytab file was found at {}.", keytabPath);
-        if (conf.getBoolean(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_REQUIRED, false)) {
-          throw new BadConfigException("No keytab file was found at %s.",
-              keytabPath);
-        } else {
-          LOG.warn("The AM will be "
-              + "started without a kerberos authenticated identity. "
-              + "The service is therefore not guaranteed to remain "
-              + "operational beyond 24 hours.");
-        }
+    String principalName = service.getKerberosPrincipal().getPrincipalName();
+    if (StringUtils.isEmpty(principalName)) {
+      LOG.warn("No Kerberos principal name specified for " + service.getName());
+      return;
+    }
+    if(StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
+      LOG.warn("No Kerberos keytab specified for " + service.getName());
+      return;
+    }
+
+    URI keytabURI;
+    try {
+      keytabURI = new URI(service.getKerberosPrincipal().getKeytab());
+    } catch (URISyntaxException e) {
+      throw new YarnException(e);
+    }
+
+    switch (keytabURI.getScheme()) {
+    case "hdfs":
+      Path keytabOnhdfs = new Path(keytabURI);
+      if (!fileSystem.getFileSystem().exists(keytabOnhdfs)) {
+        LOG.warn(service.getName() + "'s keytab (principalName = " +
+            principalName + ") doesn't exist at: " + keytabOnhdfs);
+        return;
       }
+      LocalResource keytabRes =
+          fileSystem.createAmResource(keytabOnhdfs, LocalResourceType.FILE);
+      localResource.put(String.format(YarnServiceConstants.KEYTAB_LOCATION,
+          service.getName()), keytabRes);
+      LOG.debug("Adding " + service.getName() + "'s keytab for " +
+          "localization, uri = " + keytabOnhdfs);
+      break;
+    case "file":
+      LOG.debug("Using a keytab from localhost: " + keytabURI);
+      break;
+    default:
+      LOG.warn("Unsupported URI scheme " + keytabURI);
+      break;
     }
   }
 
@@ -856,7 +904,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       return "";
     }
     ClientAMProtocol amProxy =
-        createAMProxy(appReport.getHost(), appReport.getRpcPort());
+        createAMProxy(appReport.getName(), appReport);
     GetStatusResponseProto response =
         amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
     return response.getStatus();
@@ -886,7 +934,7 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
       return appSpec;
     }
     ClientAMProtocol amProxy =
-        createAMProxy(appReport.getHost(), appReport.getRpcPort());
+        createAMProxy(serviceName, appReport);
     GetStatusResponseProto response =
         amProxy.getStatus(GetStatusRequestProto.newBuilder().build());
     appSpec = jsonSerDeser.fromJson(response.getStatus());
@@ -935,18 +983,37 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
     }
   }
 
-  protected ClientAMProtocol createAMProxy(String host, int port)
-      throws IOException {
+  protected ClientAMProtocol createAMProxy(String serviceName,
+      ApplicationReport appReport) throws IOException, YarnException {
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (!cachedAppInfo.containsKey(serviceName)) {
+        Service persistedService  = ServiceApiUtil.loadService(fs, serviceName);
+        cachedAppInfo.put(serviceName, new AppInfo(appReport.getApplicationId(),
+            persistedService.getKerberosPrincipal().getPrincipalName()));
+      }
+      String principalName = cachedAppInfo.get(serviceName).principalName;
+      // Inject the principal into hadoop conf, because Hadoop
+      // SaslRpcClient#getServerPrincipal requires a config for the
+      // principal
+      if (!StringUtils.isEmpty(principalName)) {
+        getConfig().set(PRINCIPAL, principalName);
+      } else {
+        throw new YarnException("No principal specified in the persisted " +
+            "service definition, fail to connect to AM.");
+      }
+    }
     InetSocketAddress address =
-        NetUtils.createSocketAddrForHost(host, port);
+        NetUtils.createSocketAddrForHost(appReport.getHost(), appReport
+            .getRpcPort());
     return ClientAMProxy.createProxy(getConfig(), ClientAMProtocol.class,
         UserGroupInformation.getCurrentUser(), rpc, address);
   }
 
   public synchronized ApplicationId getAppId(String serviceName)
       throws IOException, YarnException {
-    if (cachedAppIds.containsKey(serviceName)) {
-      return cachedAppIds.get(serviceName);
+    if (cachedAppInfo.containsKey(serviceName)) {
+      return cachedAppInfo.get(serviceName).appId;
     }
     Service persistedService = ServiceApiUtil.loadService(fs, serviceName);
     if (persistedService == null) {
@@ -954,7 +1021,18 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
           + " doesn't exist on hdfs. Please check if the app exists in RM");
     }
     ApplicationId currentAppId = ApplicationId.fromString(persistedService.getId());
-    cachedAppIds.put(serviceName, currentAppId);
+    cachedAppInfo.put(serviceName, new AppInfo(currentAppId, persistedService
+        .getKerberosPrincipal().getPrincipalName()));
     return currentAppId;
   }
+
+  private static class AppInfo {
+    ApplicationId appId;
+    String principalName;
+
+    AppInfo(ApplicationId appId, String principalName) {
+      this.appId = appId;
+      this.principalName = principalName;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
index 88f4763..4e05e5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/component/Component.java
@@ -101,7 +101,7 @@ public class Component implements EventHandler<ComponentEvent> {
       new StateMachineFactory<Component, ComponentState, ComponentEventType, ComponentEvent>(
           INIT)
            // INIT will only got to FLEXING
-          .addTransition(INIT, EnumSet.of(STABLE, FLEXING),
+          .addTransition(INIT, EnumSet.of(STABLE, FLEXING, INIT),
               FLEX, new FlexComponentTransition())
           // container recovered on AM restart
           .addTransition(INIT, INIT, CONTAINER_RECOVERED,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
index 684d980..ea8904a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConf.java
@@ -53,13 +53,6 @@ public class YarnServiceConf {
    */
   public static final String YARN_SERVICE_BASE_PATH = "yarn.service.base.path";
 
-  //TODO rename
-  /** Declare that a keytab must be provided */
-  public static final String KEY_AM_LOGIN_KEYTAB_REQUIRED = "slider.am.login.keytab.required";
-  public static final String KEY_AM_LOGIN_KEYTAB_NAME = "slider.am.login.keytab.name";
-  public static final String KEY_HDFS_KEYTAB_DIR = "slider.hdfs.keytab.dir";
-  public static final String KEY_AM_KEYTAB_LOCAL_PATH = "slider.am.keytab.local.path";
-
   /**
    * maximum number of failed containers (in a single component)
    * before the app exits

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
index 3973759..0378d24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/conf/YarnServiceConstants.java
@@ -40,6 +40,8 @@ public interface YarnServiceConstants {
   String APP_TYPE = "yarn-service";
 
   String KEYTAB_DIR = "keytabs";
+  String KEYTAB_LOCATION = KEYTAB_DIR + "/%s" + ".keytab";
+
   String RESOURCE_DIR = "resources";
 
 
@@ -89,4 +91,5 @@ public interface YarnServiceConstants {
   String ERR_FILE = "stderr.txt";
 
   String CONTENT = "content";
+  String PRINCIPAL = "yarn.service.am.principal";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
index 2d7c3bb..e1e88cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/AbstractLauncher.java
@@ -19,16 +19,15 @@
 package org.apache.hadoop.yarn.service.containerlaunch;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
 import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
-import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +49,6 @@ public class AbstractLauncher {
     LoggerFactory.getLogger(AbstractLauncher.class);
   public static final String CLASSPATH = "CLASSPATH";
   /**
-   * Filesystem to use for the launch
-   */
-  protected final CoreFileSystem coreFileSystem;
-  /**
    * Env vars; set up at final launch stage
    */
   protected final Map<String, String> envVars = new HashMap<>();
@@ -63,25 +58,15 @@ public class AbstractLauncher {
   protected final Map<String, LocalResource> localResources = new HashMap<>();
   protected final Map<String, String> mountPaths = new HashMap<>();
   private final Map<String, ByteBuffer> serviceData = new HashMap<>();
-  // security
-  protected final Credentials credentials;
   protected boolean yarnDockerMode = false;
   protected String dockerImage;
   protected String dockerNetwork = DEFAULT_DOCKER_NETWORK;
   protected String dockerHostname;
   protected String runPrivilegedContainer;
+  private ServiceContext context;
 
-
-  /**
-   * Create instance.
-   * @param coreFileSystem filesystem
-   * @param credentials initial set of credentials -null is permitted
-   */
-  public AbstractLauncher(
-      CoreFileSystem coreFileSystem,
-      Credentials credentials) {
-    this.coreFileSystem = coreFileSystem;
-    this.credentials = credentials != null ? credentials: new Credentials();
+  public AbstractLauncher(ServiceContext context) {
+    this.context = context;
   }
   
   public void setYarnDockerMode(boolean yarnDockerMode){
@@ -113,14 +98,6 @@ public class AbstractLauncher {
     mountPaths.put(subPath, mountPath);
   }
 
-  /**
-   * Accessor to the credentials
-   * @return the credentials associated with this launcher
-   */
-  public Credentials getCredentials() {
-    return credentials;
-  }
-
 
   public void addCommand(String cmd) {
     commands.add(cmd);
@@ -160,9 +137,9 @@ public class AbstractLauncher {
     containerLaunchContext.setLocalResources(localResources);
 
     //tokens
-    log.debug("{} tokens", credentials.numberOfTokens());
-    containerLaunchContext.setTokens(CredentialUtils.marshallCredentials(
-        credentials));
+    if (context.tokens != null) {
+      containerLaunchContext.setTokens(context.tokens.duplicate());
+    }
 
     if(yarnDockerMode){
       Map<String, String> env = containerLaunchContext.getEnvironment();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
index b9f3a24..e07661b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/ContainerLaunchService.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.containerlaunch;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.service.ServiceContext;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.provider.ProviderService;
@@ -40,10 +41,11 @@ public class ContainerLaunchService extends AbstractService{
 
   private ExecutorService executorService;
   private SliderFileSystem fs;
-
-  public ContainerLaunchService(SliderFileSystem fs) {
+  private ServiceContext context;
+  public ContainerLaunchService(ServiceContext context) {
     super(ContainerLaunchService.class.getName());
-    this.fs = fs;
+    this.fs = context.fs;
+    this.context = context;
   }
 
   @Override
@@ -84,7 +86,7 @@ public class ContainerLaunchService extends AbstractService{
       Component compSpec = instance.getCompSpec();
       ProviderService provider = ProviderFactory.getProviderService(
           compSpec.getArtifact());
-      AbstractLauncher launcher = new AbstractLauncher(fs, null);
+      AbstractLauncher launcher = new AbstractLauncher(context);
       try {
         provider.buildContainerLaunchContext(launcher, service,
             instance, fs, getConfig(), container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
deleted file mode 100644
index fce58e5..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/containerlaunch/CredentialUtils.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.service.containerlaunch;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.client.ClientRMProxy;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.ByteBuffer;
-import java.text.DateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.apache.hadoop.yarn.conf.YarnConfiguration.*;
-
-/**
- * Utils to work with credentials and tokens.
- *
- * Designed to be movable to Hadoop core
- */
-public final class CredentialUtils {
-
-  private CredentialUtils() {
-  }
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(CredentialUtils.class);
-
-  /**
-   * Save credentials to a byte buffer. Returns null if there were no
-   * credentials to save
-   * @param credentials credential set
-   * @return a byte buffer of serialized tokens
-   * @throws IOException if the credentials could not be written to the stream
-   */
-  public static ByteBuffer marshallCredentials(Credentials credentials) throws IOException {
-    ByteBuffer buffer = null;
-    if (!credentials.getAllTokens().isEmpty()) {
-      DataOutputBuffer dob = new DataOutputBuffer();
-      try {
-        credentials.writeTokenStorageToStream(dob);
-      } finally {
-        dob.close();
-      }
-      buffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
-    }
-    return buffer;
-  }
-
-  /**
-   * Save credentials to a file
-   * @param file file to save to (will be overwritten)
-   * @param credentials credentials to write
-   * @throws IOException
-   */
-  public static void saveTokens(File file,
-      Credentials credentials) throws IOException {
-    try(DataOutputStream daos = new DataOutputStream(
-        new FileOutputStream(file))) {
-      credentials.writeTokenStorageToStream(daos);
-    }
-  }
-
-  /**
-   * Look up and return the resource manager's principal. This method
-   * automatically does the <code>_HOST</code> replacement in the principal and
-   * correctly handles HA resource manager configurations.
-   *
-   * From: YARN-4629
-   * @param conf the {@link Configuration} file from which to read the
-   * principal
-   * @return the resource manager's principal string
-   * @throws IOException thrown if there's an error replacing the host name
-   */
-  public static String getRMPrincipal(Configuration conf) throws IOException {
-    String principal = conf.get(RM_PRINCIPAL, "");
-    String hostname;
-    Preconditions.checkState(!principal.isEmpty(), "Not set: " + RM_PRINCIPAL);
-
-    if (HAUtil.isHAEnabled(conf)) {
-      YarnConfiguration yarnConf = new YarnConfiguration(conf);
-      if (yarnConf.get(RM_HA_ID) == null) {
-        // If RM_HA_ID is not configured, use the first of RM_HA_IDS.
-        // Any valid RM HA ID should work.
-        String[] rmIds = yarnConf.getStrings(RM_HA_IDS);
-        Preconditions.checkState((rmIds != null) && (rmIds.length > 0),
-            "Not set " + RM_HA_IDS);
-        yarnConf.set(RM_HA_ID, rmIds[0]);
-      }
-
-      hostname = yarnConf.getSocketAddr(
-          RM_ADDRESS,
-          DEFAULT_RM_ADDRESS,
-          DEFAULT_RM_PORT).getHostName();
-    } else {
-      hostname = conf.getSocketAddr(
-          RM_ADDRESS,
-          DEFAULT_RM_ADDRESS,
-          DEFAULT_RM_PORT).getHostName();
-    }
-    return SecurityUtil.getServerPrincipal(principal, hostname);
-  }
-
-  /**
-   * Create and add any filesystem delegation tokens with
-   * the RM(s) configured to be able to renew them. Returns null
-   * on an insecure cluster (i.e. harmless)
-   * @param conf configuration
-   * @param fs filesystem
-   * @param credentials credentials to update
-   * @return a list of all added tokens.
-   * @throws IOException
-   */
-  public static Token<?>[] addRMRenewableFSDelegationTokens(Configuration conf,
-      FileSystem fs,
-      Credentials credentials) throws IOException {
-    Preconditions.checkArgument(conf != null);
-    Preconditions.checkArgument(credentials != null);
-    if (UserGroupInformation.isSecurityEnabled()) {
-      return fs.addDelegationTokens(CredentialUtils.getRMPrincipal(conf),
-          credentials);
-    }
-    return null;
-  }
-
-  /**
-   * Add an FS delegation token which can be renewed by the current user
-   * @param fs filesystem
-   * @param credentials credentials to update
-   * @throws IOException problems.
-   */
-  public static void addSelfRenewableFSDelegationTokens(
-      FileSystem fs,
-      Credentials credentials) throws IOException {
-    Preconditions.checkArgument(fs != null);
-    Preconditions.checkArgument(credentials != null);
-    fs.addDelegationTokens(
-        getSelfRenewer(),
-        credentials);
-  }
-
-  public static String getSelfRenewer() throws IOException {
-    return UserGroupInformation.getLoginUser().getShortUserName();
-  }
-
-  /**
-   * Create and add an RM delegation token to the credentials
-   * @param yarnClient Yarn Client
-   * @param credentials to add token to
-   * @return the token which was added
-   * @throws IOException
-   * @throws YarnException
-   */
-  public static Token<TokenIdentifier> addRMDelegationToken(YarnClient yarnClient,
-      Credentials credentials)
-      throws IOException, YarnException {
-    Configuration conf = yarnClient.getConfig();
-    Text rmPrincipal = new Text(CredentialUtils.getRMPrincipal(conf));
-    Text rmDTService = ClientRMProxy.getRMDelegationTokenService(conf);
-    Token<TokenIdentifier> rmDelegationToken =
-        ConverterUtils.convertFromYarn(
-            yarnClient.getRMDelegationToken(rmPrincipal),
-            rmDTService);
-    credentials.addToken(rmDelegationToken.getService(), rmDelegationToken);
-    return rmDelegationToken;
-  }
-
-  public static Token<TimelineDelegationTokenIdentifier> maybeAddTimelineToken(
-      Configuration conf,
-      Credentials credentials)
-      throws IOException, YarnException {
-    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, false)) {
-      LOG.debug("Timeline service enabled -fetching token");
-
-      try(TimelineClient timelineClient = TimelineClient.createTimelineClient()) {
-        timelineClient.init(conf);
-        timelineClient.start();
-        Token<TimelineDelegationTokenIdentifier> token =
-            timelineClient.getDelegationToken(
-                CredentialUtils.getRMPrincipal(conf));
-        credentials.addToken(token.getService(), token);
-        return token;
-      }
-    } else {
-      LOG.debug("Timeline service is disabled");
-      return null;
-    }
-  }
-
-  /**
-   * Filter a list of tokens from a set of credentials
-   * @param credentials credential source (a new credential set os re
-   * @param filter List of tokens to strip out
-   * @return a new, filtered, set of credentials
-   */
-  public static Credentials filterTokens(Credentials credentials,
-      List<Text> filter) {
-    Credentials result = new Credentials(credentials);
-    Iterator<Token<? extends TokenIdentifier>> iter =
-        result.getAllTokens().iterator();
-    while (iter.hasNext()) {
-      Token<? extends TokenIdentifier> token = iter.next();
-      LOG.debug("Token {}", token.getKind());
-      if (filter.contains(token.getKind())) {
-        LOG.debug("Filtering token {}", token.getKind());
-        iter.remove();
-      }
-    }
-    return result;
-  }
-
-  public static String dumpTokens(Credentials credentials, String separator) {
-    ArrayList<Token<? extends TokenIdentifier>> sorted =
-        new ArrayList<>(credentials.getAllTokens());
-    Collections.sort(sorted, new TokenComparator());
-    StringBuilder buffer = new StringBuilder(sorted.size()* 128);
-    for (Token<? extends TokenIdentifier> token : sorted) {
-      buffer.append(tokenToString(token)).append(separator);
-    }
-    return buffer.toString();
-  }
-
-  /**
-   * Create a string for people to look at
-   * @param token token to convert to a string form
-   * @return a printable view of the token
-   */
-  public static String tokenToString(Token<? extends TokenIdentifier> token) {
-    DateFormat df = DateFormat.getDateTimeInstance(
-        DateFormat.SHORT, DateFormat.SHORT);
-    StringBuilder buffer = new StringBuilder(128);
-    buffer.append(token.toString());
-    try {
-      TokenIdentifier ti = token.decodeIdentifier();
-      buffer.append("; ").append(ti);
-      if (ti instanceof AbstractDelegationTokenIdentifier) {
-        // details in human readable form, and compensate for information HDFS DT omits
-        AbstractDelegationTokenIdentifier dt = (AbstractDelegationTokenIdentifier) ti;
-        buffer.append("; Renewer: ").append(dt.getRenewer());
-        buffer.append("; Issued: ")
-            .append(df.format(new Date(dt.getIssueDate())));
-        buffer.append("; Max Date: ")
-            .append(df.format(new Date(dt.getMaxDate())));
-      }
-    } catch (IOException e) {
-      //marshall problem; not ours
-      LOG.debug("Failed to decode {}: {}", token, e, e);
-    }
-    return buffer.toString();
-  }
-
-  /**
-   * Get the expiry time of a token.
-   * @param token token to examine
-   * @return the time in milliseconds after which the token is invalid.
-   * @throws IOException
-   */
-  public static long getTokenExpiryTime(Token token) throws IOException {
-    TokenIdentifier identifier = token.decodeIdentifier();
-    Preconditions.checkState(identifier instanceof AbstractDelegationTokenIdentifier,
-        "Token %s of type: %s has an identifier which cannot be examined: %s",
-        token, token.getClass(), identifier);
-    AbstractDelegationTokenIdentifier id =
-        (AbstractDelegationTokenIdentifier) identifier;
-    return id.getMaxDate();
-  }
-
-  private static class TokenComparator
-      implements Comparator<Token<? extends TokenIdentifier>>, Serializable {
-    @Override
-    public int compare(Token<? extends TokenIdentifier> left,
-        Token<? extends TokenIdentifier> right) {
-      return left.getKind().toString().compareTo(right.getKind().toString());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
new file mode 100644
index 0000000..766da0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/package-info.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Yarn Service framework.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.service;
+import org.apache.hadoop.classification.InterfaceAudience;
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
index c0c44c3..d65a196 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/provider/ProviderUtils.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.service.provider;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -28,21 +27,18 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.service.ServiceContext;
-import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Component;
 import org.apache.hadoop.yarn.service.api.records.ConfigFile;
 import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
-import org.apache.hadoop.yarn.service.api.records.Configuration;
 import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
 import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
-import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
 import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
 import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
 import org.apache.hadoop.yarn.service.exceptions.SliderException;
 import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
 import org.apache.hadoop.yarn.service.utils.PublishedConfigurationOutputter;
-import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.apache.hadoop.yarn.service.utils.ServiceUtils;
+import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -163,53 +159,6 @@ public class ProviderUtils implements YarnServiceConstants {
     }
   }
 
-  /**
-   * Localize the service keytabs for the service.
-   * @param launcher container launcher
-   * @param fileSystem file system
-   * @throws IOException trouble uploading to HDFS
-   */
-  public void localizeServiceKeytabs(AbstractLauncher launcher,
-      SliderFileSystem fileSystem, Service service) throws IOException {
-
-    Configuration conf = service.getConfiguration();
-    String keytabPathOnHost =
-        conf.getProperty(YarnServiceConf.KEY_AM_KEYTAB_LOCAL_PATH);
-    if (ServiceUtils.isUnset(keytabPathOnHost)) {
-      String amKeytabName =
-          conf.getProperty(YarnServiceConf.KEY_AM_LOGIN_KEYTAB_NAME);
-      String keytabDir =
-          conf.getProperty(YarnServiceConf.KEY_HDFS_KEYTAB_DIR);
-      // we need to localize the keytab files in the directory
-      Path keytabDirPath = fileSystem.buildKeytabPath(keytabDir, null,
-          service.getName());
-      boolean serviceKeytabsDeployed = false;
-      if (fileSystem.getFileSystem().exists(keytabDirPath)) {
-        FileStatus[] keytabs = fileSystem.getFileSystem().listStatus(
-            keytabDirPath);
-        LocalResource keytabRes;
-        for (FileStatus keytab : keytabs) {
-          if (!amKeytabName.equals(keytab.getPath().getName())
-              && keytab.getPath().getName().endsWith(".keytab")) {
-            serviceKeytabsDeployed = true;
-            log.info("Localizing keytab {}", keytab.getPath().getName());
-            keytabRes = fileSystem.createAmResource(keytab.getPath(),
-                LocalResourceType.FILE);
-            launcher.addLocalResource(KEYTAB_DIR + "/" +
-                    keytab.getPath().getName(),
-                keytabRes);
-          }
-        }
-      }
-      if (!serviceKeytabsDeployed) {
-        log.warn("No service keytabs for the service have been localized.  "
-            + "If the service requires keytabs for secure operation, "
-            + "please ensure that the required keytabs have been uploaded "
-            + "to the folder {}", keytabDirPath);
-      }
-    }
-  }
-
   public static Path initCompInstanceDir(SliderFileSystem fs,
       ComponentInstance instance) {
     Path compDir = new Path(new Path(fs.getAppDir(), "components"),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d30d5782/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
index b58cea8..d5ea45c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-services/hadoop-yarn-services-core/src/main/java/org/apache/hadoop/yarn/service/utils/ServiceApiUtil.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.registry.client.api.RegistryConstants;
 import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.api.records.Service;
 import org.apache.hadoop.yarn.service.api.records.Artifact;
@@ -40,6 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
@@ -86,6 +90,17 @@ public class ServiceApiUtil {
           "No component specified for " + service.getName());
     }
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      if (!StringUtils.isEmpty(service.getKerberosPrincipal().getKeytab())) {
+        try {
+          // validate URI format
+          new URI(service.getKerberosPrincipal().getKeytab());
+        } catch (URISyntaxException e) {
+          throw new IllegalArgumentException(e);
+        }
+      }
+    }
+
     // Validate there are no component name collisions (collisions are not
     // currently supported) and add any components from external services
     Configuration globalConf = service.getConfiguration();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org