You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2014/10/08 21:54:51 UTC

[4/6] YARN-913 service registry: YARN-2652 add hadoop-yarn-registry package under hadoop-yarn

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
new file mode 100644
index 0000000..6484d28
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
@@ -0,0 +1,996 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Locale;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions.*;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
+
+/**
+ * Implement the registry security ... a self contained service for
+ * testability.
+ * <p>
+ * This class contains:
+ * <ol>
+ *   <li>
+ *     The registry security policy implementation, configuration reading, ACL
+ * setup and management
+ *   </li>
+ *   <li>Lots of static helper methods to aid security setup and debugging</li>
+ * </ol>
+ */
+
+public class RegistrySecurity extends AbstractService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistrySecurity.class);
+
+  public static final String E_UNKNOWN_AUTHENTICATION_MECHANISM =
+      "Unknown/unsupported authentication mechanism; ";
+
+  /**
+   * there's no default user to add with permissions, so it would be
+   * impossible to create nodes with unrestricted user access
+   */
+  public static final String E_NO_USER_DETERMINED_FOR_ACLS =
+      "No user for ACLs determinable from current user or registry option "
+      + KEY_REGISTRY_USER_ACCOUNTS;
+
+  /**
+   * Error raised when the registry is tagged as secure but this
+   * process doesn't have hadoop security enabled.
+   */
+  public static final String E_NO_KERBEROS =
+      "Registry security is enabled -but Hadoop security is not enabled";
+
+  /**
+   * Access policy options
+   */
+  private enum AccessPolicy {
+    anon, sasl, digest
+  }
+
+  /**
+   * Access mechanism
+   */
+  private AccessPolicy access;
+
+  /**
+   * User used for digest auth
+   */
+
+  private String digestAuthUser;
+
+  /**
+   * Password used for digest auth
+   */
+
+  private String digestAuthPassword;
+
+  /**
+   * Auth data used for digest auth
+   */
+  private byte[] digestAuthData;
+
+  /**
+   * flag set to true if the registry has security enabled.
+   */
+  private boolean secureRegistry;
+
+  /**
+   * An ACL with read-write access for anyone
+   */
+  public static final ACL ALL_READWRITE_ACCESS =
+      new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);
+
+  /**
+   * An ACL with read access for anyone
+   */
+  public static final ACL ALL_READ_ACCESS =
+      new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE);
+
+  /**
+   * An ACL list containing the {@link #ALL_READWRITE_ACCESS} entry.
+   * It is copy on write so can be shared without worry
+   */
+  public static final List<ACL> WorldReadWriteACL;
+
+  static {
+    List<ACL> acls = new ArrayList<ACL>();
+    acls.add(ALL_READWRITE_ACCESS);
+    WorldReadWriteACL = new CopyOnWriteArrayList<ACL>(acls);
+  }
+
+  /**
+   * the list of system ACLs
+   */
+  private final List<ACL> systemACLs = new ArrayList<ACL>();
+
+  /**
+   * A list of digest ACLs which can be added to permissions
+   * —and cleared later.
+   */
+  private final List<ACL> digestACLs = new ArrayList<ACL>();
+
+  /**
+   * the default kerberos realm
+   */
+  private String kerberosRealm;
+
+  /**
+   * Client context
+   */
+  private String jaasClientContext;
+
+  /**
+   * Client identity
+   */
+  private String jaasClientIdentity;
+
+  /**
+   * Create an instance
+   * @param name service name
+   */
+  public RegistrySecurity(String name) {
+    super(name);
+  }
+
+  /**
+   * Init the service: this sets up security based on the configuration
+   * @param conf configuration
+   * @throws Exception
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    String auth = conf.getTrimmed(KEY_REGISTRY_CLIENT_AUTH,
+        REGISTRY_CLIENT_AUTH_ANONYMOUS);
+
+    // TODO JDK7 SWITCH
+    if (REGISTRY_CLIENT_AUTH_KERBEROS.equals(auth)) {
+      access = AccessPolicy.sasl;
+    } else if (REGISTRY_CLIENT_AUTH_DIGEST.equals(auth)) {
+      access = AccessPolicy.digest;
+    } else if (REGISTRY_CLIENT_AUTH_ANONYMOUS.equals(auth)) {
+      access = AccessPolicy.anon;
+    } else {
+      throw new ServiceStateException(E_UNKNOWN_AUTHENTICATION_MECHANISM
+                                      + "\"" + auth + "\"");
+    }
+    initSecurity();
+  }
+
+  /**
+   * Init security.
+   *
+   * After this operation, the {@link #systemACLs} list is valid.
+   * @throws IOException
+   */
+  private void initSecurity() throws IOException {
+
+    secureRegistry =
+        getConfig().getBoolean(KEY_REGISTRY_SECURE, DEFAULT_REGISTRY_SECURE);
+    systemACLs.clear();
+    if (secureRegistry) {
+      addSystemACL(ALL_READ_ACCESS);
+
+      // determine the kerberos realm from JVM and settings
+      kerberosRealm = getConfig().get(KEY_REGISTRY_KERBEROS_REALM,
+          getDefaultRealmInJVM());
+
+      // System Accounts
+      String system = getOrFail(KEY_REGISTRY_SYSTEM_ACCOUNTS,
+                                DEFAULT_REGISTRY_SYSTEM_ACCOUNTS);
+
+      systemACLs.addAll(buildACLs(system, kerberosRealm, ZooDefs.Perms.ALL));
+
+      // user accounts (may be empty, but for digest one user AC must
+      // be built up
+      String user = getConfig().get(KEY_REGISTRY_USER_ACCOUNTS,
+                              DEFAULT_REGISTRY_USER_ACCOUNTS);
+      List<ACL> userACLs = buildACLs(user, kerberosRealm, ZooDefs.Perms.ALL);
+
+      // add self if the current user can be determined
+      ACL self;
+      if (UserGroupInformation.isSecurityEnabled()) {
+        self = createSaslACLFromCurrentUser(ZooDefs.Perms.ALL);
+        if (self != null) {
+          userACLs.add(self);
+        }
+      }
+
+      // here check for UGI having secure on or digest + ID
+      switch (access) {
+        case sasl:
+          // secure + SASL => has to be authenticated
+          if (!UserGroupInformation.isSecurityEnabled()) {
+            throw new IOException("Kerberos required for secure registry access");
+          }
+          UserGroupInformation currentUser =
+              UserGroupInformation.getCurrentUser();
+          jaasClientContext = getOrFail(KEY_REGISTRY_CLIENT_JAAS_CONTEXT,
+              DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT);
+          jaasClientIdentity = currentUser.getShortUserName();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Auth is SASL user=\"{}\" JAAS context=\"{}\"",
+                jaasClientIdentity,
+                jaasClientContext);
+          }
+          break;
+
+        case digest:
+          String id = getOrFail(KEY_REGISTRY_CLIENT_AUTHENTICATION_ID, "");
+          String pass = getOrFail(KEY_REGISTRY_CLIENT_AUTHENTICATION_PASSWORD, "");
+          if (userACLs.isEmpty()) {
+            //
+            throw new ServiceStateException(E_NO_USER_DETERMINED_FOR_ACLS);
+          }
+          digest(id, pass);
+          ACL acl = new ACL(ZooDefs.Perms.ALL, toDigestId(id, pass));
+          userACLs.add(acl);
+          digestAuthUser = id;
+          digestAuthPassword = pass;
+          String authPair = id + ":" + pass;
+          digestAuthData = authPair.getBytes("UTF-8");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Auth is Digest ACL: {}", aclToString(acl));
+          }
+          break;
+
+        case anon:
+          // nothing is needed; account is read only.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Auth is anonymous");
+          }
+          userACLs = new ArrayList<ACL>(0);
+          break;
+      }
+      systemACLs.addAll(userACLs);
+
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Registry has no security");
+      }
+      // wide open cluster, adding system acls
+      systemACLs.addAll(WorldReadWriteACL);
+    }
+  }
+
+  /**
+   * Add another system ACL
+   * @param acl add ACL
+   */
+  public void addSystemACL(ACL acl) {
+    systemACLs.add(acl);
+  }
+
+  /**
+   * Add a digest ACL
+   * @param acl add ACL
+   */
+  public boolean addDigestACL(ACL acl) {
+    if (secureRegistry) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Added ACL {}", aclToString(acl));
+      }
+      digestACLs.add(acl);
+      return true;
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Ignoring added ACL - registry is insecure{}",
+            aclToString(acl));
+      }
+      return false;
+    }
+  }
+
+  /**
+   * Reset the digest ACL list
+   */
+  public void resetDigestACLs() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Cleared digest ACLs");
+    }
+    digestACLs.clear();
+  }
+
+  /**
+   * Flag to indicate the cluster is secure
+   * @return true if the config enabled security
+   */
+  public boolean isSecureRegistry() {
+    return secureRegistry;
+  }
+
+  /**
+   * Get the system principals
+   * @return the system principals
+   */
+  public List<ACL> getSystemACLs() {
+    Preconditions.checkNotNull(systemACLs, "registry security is unitialized");
+    return Collections.unmodifiableList(systemACLs);
+  }
+
+  /**
+   * Get all ACLs needed for a client to use when writing to the repo.
+   * That is: system ACLs, its own ACL, any digest ACLs
+   * @return the client ACLs
+   */
+  public List<ACL> getClientACLs() {
+    List<ACL> clientACLs = new ArrayList<ACL>(systemACLs);
+    clientACLs.addAll(digestACLs);
+    return clientACLs;
+  }
+
+  /**
+   * Create a SASL ACL for the user
+   * @param perms permissions
+   * @return an ACL for the current user or null if they aren't a kerberos user
+   * @throws IOException
+   */
+  public ACL createSaslACLFromCurrentUser(int perms) throws IOException {
+    UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+    if (currentUser.hasKerberosCredentials()) {
+      return createSaslACL(currentUser, perms);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Given a UGI, create a SASL ACL from it
+   * @param ugi UGI
+   * @param perms permissions
+   * @return a new ACL
+   */
+  public ACL createSaslACL(UserGroupInformation ugi, int perms) {
+    String userName = ugi.getUserName();
+    return new ACL(perms, new Id(SCHEME_SASL, userName));
+  }
+
+  /**
+   * Get a conf option, throw an exception if it is null/empty
+   * @param key key
+   * @param defval default value
+   * @return the value
+   * @throws IOException if missing
+   */
+  private String getOrFail(String key, String defval) throws IOException {
+    String val = getConfig().get(key, defval);
+    if (StringUtils.isEmpty(val)) {
+      throw new IOException("Missing value for configuration option " + key);
+    }
+    return val;
+  }
+
+  /**
+   * Check for an id:password tuple being valid.
+   * This test is stricter than that in {@link DigestAuthenticationProvider},
+   * which splits the string, but doesn't check the contents of each
+   * half for being non-"".
+   * @param idPasswordPair id:pass pair
+   * @return true if the pass is considered valid.
+   */
+  public boolean isValid(String idPasswordPair) {
+    String[] parts = idPasswordPair.split(":");
+    return parts.length == 2
+           && !StringUtils.isEmpty(parts[0])
+           && !StringUtils.isEmpty(parts[1]);
+  }
+
+  /**
+   * Get the derived kerberos realm.
+   * @return this is built from the JVM realm, or the configuration if it
+   * overrides it. If "", it means "don't know".
+   */
+  public String getKerberosRealm() {
+    return kerberosRealm;
+  }
+
+  /**
+   * Generate a base-64 encoded digest of the idPasswordPair pair
+   * @param idPasswordPair id:password
+   * @return a string that can be used for authentication
+   */
+  public String digest(String idPasswordPair) throws IOException {
+    if (StringUtils.isEmpty(idPasswordPair) || !isValid(idPasswordPair)) {
+      throw new IOException("Invalid id:password: " + idPasswordPair);
+    }
+    try {
+      return DigestAuthenticationProvider.generateDigest(idPasswordPair);
+    } catch (NoSuchAlgorithmException e) {
+      // unlikely since it is standard to the JVM, but maybe JCE restrictions
+      // could trigger it
+      throw new IOException(e.toString(), e);
+    }
+  }
+
+  /**
+   * Generate a base-64 encoded digest of the idPasswordPair pair
+   * @param id ID
+   * @param password pass
+   * @return a string that can be used for authentication
+   * @throws IOException
+   */
+  public String digest(String id, String password) throws IOException {
+    return digest(id + ":" + password);
+  }
+
+  /**
+   * Given a digest, create an ID from it
+   * @param digest digest
+   * @return ID
+   */
+  public Id toDigestId(String digest) {
+    return new Id(SCHEME_DIGEST, digest);
+  }
+
+  /**
+   * Create a Digest ID from an id:pass pair
+   * @param id ID
+   * @param password password
+   * @return an ID
+   * @throws IOException
+   */
+  public Id toDigestId(String id, String password) throws IOException {
+    return toDigestId(digest(id, password));
+  }
+
+  /**
+   * Split up a list of the form
+   * <code>sasl:mapred@,digest:5f55d66, sasl@yarn@EXAMPLE.COM</code>
+   * into a list of possible ACL values, trimming as needed
+   *
+   * The supplied realm is added to entries where
+   * <ol>
+   *   <li>the string begins "sasl:"</li>
+   *   <li>the string ends with "@"</li>
+   * </ol>
+   * No attempt is made to validate any of the acl patterns.
+   *
+   * @param aclString list of 0 or more ACLs
+   * @param realm realm to add
+   * @return a list of split and potentially patched ACL pairs.
+   *
+   */
+  public List<String> splitAclPairs(String aclString, String realm) {
+    List<String> list = Lists.newArrayList(
+        Splitter.on(',').omitEmptyStrings().trimResults()
+                .split(aclString));
+    ListIterator<String> listIterator = list.listIterator();
+    while (listIterator.hasNext()) {
+      String next = listIterator.next();
+      if (next.startsWith(SCHEME_SASL +":") && next.endsWith("@")) {
+        listIterator.set(next + realm);
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Parse a string down to an ID, adding a realm if needed
+   * @param idPair id:data tuple
+   * @param realm realm to add
+   * @return the ID.
+   * @throws IllegalArgumentException if the idPair is invalid
+   */
+  public Id parse(String idPair, String realm) {
+    int firstColon = idPair.indexOf(':');
+    int lastColon = idPair.lastIndexOf(':');
+    if (firstColon == -1 || lastColon == -1 || firstColon != lastColon) {
+      throw new IllegalArgumentException(
+          "ACL '" + idPair + "' not of expected form scheme:id");
+    }
+    String scheme = idPair.substring(0, firstColon);
+    String id = idPair.substring(firstColon + 1);
+    if (id.endsWith("@")) {
+      Preconditions.checkArgument(
+          StringUtils.isNotEmpty(realm),
+          "@ suffixed account but no realm %s", id);
+      id = id + realm;
+    }
+    return new Id(scheme, id);
+  }
+
+  /**
+   * Parse the IDs, adding a realm if needed, setting the permissions
+   * @param principalList id string
+   * @param realm realm to add
+   * @param perms permissions
+   * @return the relevant ACLs
+   * @throws IOException
+   */
+  public List<ACL> buildACLs(String principalList, String realm, int perms)
+      throws IOException {
+    List<String> aclPairs = splitAclPairs(principalList, realm);
+    List<ACL> ids = new ArrayList<ACL>(aclPairs.size());
+    for (String aclPair : aclPairs) {
+      ACL newAcl = new ACL();
+      newAcl.setId(parse(aclPair, realm));
+      newAcl.setPerms(perms);
+      ids.add(newAcl);
+    }
+    return ids;
+  }
+
+  /**
+   * Parse an ACL list. This includes configuration indirection
+   * {@link ZKUtil#resolveConfIndirection(String)}
+   * @param zkAclConf configuration string
+   * @return an ACL list
+   * @throws IOException on a bad ACL parse
+   */
+  public List<ACL> parseACLs(String zkAclConf) throws IOException {
+    try {
+      return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
+    } catch (ZKUtil.BadAclFormatException e) {
+      throw new IOException("Parsing " + zkAclConf + " :" + e, e);
+    }
+  }
+
+  /**
+   * Get the appropriate Kerberos Auth module for JAAS entries
+   * for this JVM.
+   * @return a JVM-specific kerberos login module classname.
+   */
+  public static String getKerberosAuthModuleForJVM() {
+    if (System.getProperty("java.vendor").contains("IBM")) {
+      return "com.ibm.security.auth.module.Krb5LoginModule";
+    } else {
+      return "com.sun.security.auth.module.Krb5LoginModule";
+    }
+  }
+
+  /**
+   * JAAS template: {@value}
+   * Note the semicolon on the last entry
+   */
+  private static final String JAAS_ENTRY =
+      "%s { \n"
+      + " %s required\n"
+      // kerberos module
+      + " keyTab=\"%s\"\n"
+      + " principal=\"%s\"\n"
+      + " useKeyTab=true\n"
+      + " useTicketCache=false\n"
+      + " doNotPrompt=true\n"
+      + " storeKey=true;\n"
+      + "}; \n"
+      ;
+
+  /**
+   * Create a JAAS entry for insertion
+   * @param context context of the entry
+   * @param principal kerberos principal
+   * @param keytab keytab
+   * @return a context
+   */
+  public String createJAASEntry(
+      String context,
+      String principal,
+      File keytab) {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(principal),
+        "invalid principal");
+    Preconditions.checkArgument(StringUtils.isNotEmpty(context),
+        "invalid context");
+    Preconditions.checkArgument(keytab != null && keytab.isFile(),
+        "Keytab null or missing: ");
+    return String.format(
+        Locale.ENGLISH,
+        JAAS_ENTRY,
+        context,
+        getKerberosAuthModuleForJVM(),
+        keytab.getAbsolutePath(),
+        principal);
+  }
+
+  /**
+   * Bind the JVM JAS setting to the specified JAAS file.
+   *
+   * <b>Important:</b> once a file has been loaded the JVM doesn't pick up
+   * changes
+   * @param jaasFile the JAAS file
+   */
+  public static void bindJVMtoJAASFile(File jaasFile) {
+    String path = jaasFile.getAbsolutePath();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Binding {} to {}", Environment.JAAS_CONF_KEY, path);
+    }
+    System.setProperty(Environment.JAAS_CONF_KEY, path);
+  }
+
+  /**
+   * Set the Zookeeper server property
+   * {@link ZookeeperConfigOptions#PROP_ZK_SERVER_SASL_CONTEXT}
+   * to the SASL context. When the ZK server starts, this is the context
+   * which it will read in
+   * @param contextName the name of the context
+   */
+  public static void bindZKToServerJAASContext(String contextName) {
+    System.setProperty(PROP_ZK_SERVER_SASL_CONTEXT, contextName);
+  }
+
+  /**
+   * Reset any system properties related to JAAS
+   */
+  public static void clearJaasSystemProperties() {
+    System.clearProperty(Environment.JAAS_CONF_KEY);
+  }
+
+  /**
+   * Resolve the context of an entry. This is an effective test of
+   * JAAS setup, because it will relay detected problems up
+   * @param context context name
+   * @return the entry
+   * @throws RuntimeException if there is no context entry found
+   */
+  public static AppConfigurationEntry[] validateContext(String context)  {
+    if (context == null) {
+      throw new RuntimeException("Null context argument");
+    }
+    if (context.isEmpty()) {
+      throw new RuntimeException("Empty context argument");
+    }
+    javax.security.auth.login.Configuration configuration =
+        javax.security.auth.login.Configuration.getConfiguration();
+    AppConfigurationEntry[] entries =
+        configuration.getAppConfigurationEntry(context);
+    if (entries == null) {
+      throw new RuntimeException(
+          String.format("Entry \"%s\" not found; " +
+                        "JAAS config = %s",
+              context,
+              describeProperty(Environment.JAAS_CONF_KEY) ));
+    }
+    return entries;
+  }
+
+  /**
+   * Apply the security environment to this curator instance. This
+   * may include setting up the ZK system properties for SASL
+   * @param builder curator builder
+   */
+  public void applySecurityEnvironment(CuratorFrameworkFactory.Builder builder) {
+
+    if (isSecureRegistry()) {
+      switch (access) {
+        case anon:
+          clearZKSaslClientProperties();
+          break;
+
+        case digest:
+          // no SASL
+          clearZKSaslClientProperties();
+          builder.authorization(SCHEME_DIGEST, digestAuthData);
+          break;
+
+        case sasl:
+          // bind to the current identity and context within the JAAS file
+          setZKSaslClientProperties(jaasClientIdentity, jaasClientContext);
+      }
+    }
+  }
+
+  /**
+   * Set the client properties. This forces the ZK client into
+   * failing if it can't auth.
+   * <b>Important:</b>This is JVM-wide.
+   * @param username username
+   * @param context login context
+   * @throws RuntimeException if the context cannot be found in the current
+   * JAAS context
+   */
+  public static void setZKSaslClientProperties(String username,
+      String context)  {
+    RegistrySecurity.validateContext(context);
+    enableZookeeperClientSASL();
+    System.setProperty(PROP_ZK_SASL_CLIENT_USERNAME, username);
+    System.setProperty(PROP_ZK_SASL_CLIENT_CONTEXT, context);
+  }
+
+  /**
+   * Clear all the ZK SASL Client properties
+   * <b>Important:</b>This is JVM-wide
+   */
+  public static void clearZKSaslClientProperties() {
+    disableZookeeperClientSASL();
+    System.clearProperty(PROP_ZK_SASL_CLIENT_CONTEXT);
+    System.clearProperty(PROP_ZK_SASL_CLIENT_USERNAME);
+  }
+
+  /**
+   * Turn ZK SASL on
+   * <b>Important:</b>This is JVM-wide
+   */
+  protected static void enableZookeeperClientSASL() {
+    System.setProperty(PROP_ZK_ENABLE_SASL_CLIENT, "true");
+  }
+
+  /**
+   * Force disable ZK SASL bindings.
+   * <b>Important:</b>This is JVM-wide
+   */
+  public static void disableZookeeperClientSASL() {
+    System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, "false");
+  }
+
+  /**
+   * Is the system property enabling the SASL client set?
+   * @return true if the SASL client system property is set.
+   */
+  public static boolean isClientSASLEnabled() {
+    return ZooKeeperSaslClient.isEnabled();
+  }
+
+  /**
+   * Log details about the current Hadoop user at INFO.
+   * Robust against IOEs when trying to get the current user
+   */
+  public void logCurrentHadoopUser() {
+    try {
+      UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+      LOG.info("Current user = {}",currentUser);
+      UserGroupInformation realUser = currentUser.getRealUser();
+      LOG.info("Real User = {}" , realUser);
+    } catch (IOException e) {
+      LOG.warn("Failed to get current user {}, {}", e);
+    }
+  }
+
+  /**
+   * Stringify a list of ACLs for logging. Digest ACLs have their
+   * digest values stripped for security.
+   * @param acls ACL list
+   * @return a string for logs, exceptions, ...
+   */
+  public static String aclsToString(List<ACL> acls) {
+    StringBuilder builder = new StringBuilder();
+    if (acls == null) {
+      builder.append("null ACL");
+    } else {
+      builder.append('\n');
+      for (ACL acl : acls) {
+        builder.append(aclToString(acl))
+               .append(" ");
+      }
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Convert an ACL to a string, with any obfuscation needed
+   * @param acl ACL
+   * @return ACL string value
+   */
+  public static String aclToString(ACL acl) {
+    return String.format(Locale.ENGLISH,
+        "0x%02x: %s",
+        acl.getPerms(),
+        idToString(acl.getId())
+    );
+  }
+
+  /**
+   * Convert an ID to a string, stripping out all but the first few characters
+   * of any digest auth hash for security reasons
+   * @param id ID
+   * @return a string description of a Zookeeper ID
+   */
+  public static String idToString(Id id) {
+    String s;
+    if (id.getScheme().equals(SCHEME_DIGEST)) {
+      String ids = id.getId();
+      int colon = ids.indexOf(':');
+      if (colon > 0) {
+        ids = ids.substring(colon + 3);
+      }
+      s = SCHEME_DIGEST + ": " + ids;
+    } else {
+      s = id.toString();
+    }
+    return s;
+  }
+
+  /**
+   * Build up low-level security diagnostics to aid debugging
+   * @return a string to use in diagnostics
+   */
+  public String buildSecurityDiagnostics() {
+    StringBuilder builder = new StringBuilder();
+    builder.append(secureRegistry ? "secure registry; "
+                          : "insecure registry; ");
+    builder.append("Access policy: ").append(access);
+
+    builder.append(", System ACLs: ").append(aclsToString(systemACLs));
+    builder.append(UgiInfo.fromCurrentUser());
+    builder.append(" Kerberos Realm: ").append(kerberosRealm).append(" ; ");
+    builder.append(describeProperty(Environment.JAAS_CONF_KEY));
+    String sasl =
+        System.getProperty(PROP_ZK_ENABLE_SASL_CLIENT,
+            DEFAULT_ZK_ENABLE_SASL_CLIENT);
+    boolean saslEnabled = Boolean.valueOf(sasl);
+    builder.append(describeProperty(PROP_ZK_ENABLE_SASL_CLIENT,
+        DEFAULT_ZK_ENABLE_SASL_CLIENT));
+    if (saslEnabled) {
+      builder.append("JAAS Client Identity")
+             .append("=")
+             .append(jaasClientIdentity)
+             .append("; ");
+      builder.append(KEY_REGISTRY_CLIENT_JAAS_CONTEXT)
+             .append("=")
+             .append(jaasClientContext)
+             .append("; ");
+      builder.append(describeProperty(PROP_ZK_SASL_CLIENT_USERNAME));
+      builder.append(describeProperty(PROP_ZK_SASL_CLIENT_CONTEXT));
+    }
+    builder.append(describeProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS,
+        "(undefined but defaults to true)"));
+    builder.append(describeProperty(
+        PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE));
+    return builder.toString();
+  }
+
+  private static String describeProperty(String name) {
+    return describeProperty(name, "(undefined)");
+  }
+
+  private static String describeProperty(String name, String def) {
+    return "; " + name + "=" + System.getProperty(name, def);
+  }
+
+  /**
+   * Get the default kerberos realm —returning "" if there
+   * is no realm or other problem
+   * @return the default realm of the system if it
+   * could be determined
+   */
+  public static String getDefaultRealmInJVM() {
+    try {
+      return KerberosUtil.getDefaultRealm();
+      // JDK7
+    } catch (ClassNotFoundException ignored) {
+      // ignored
+    } catch (NoSuchMethodException ignored) {
+      // ignored
+    } catch (IllegalAccessException ignored) {
+      // ignored
+    } catch (InvocationTargetException ignored) {
+      // ignored
+    }
+    return "";
+  }
+
+  /**
+   * Create an ACL For a user.
+   * @param ugi User identity
+   * @return the ACL For the specified user. Ifthe username doesn't end
+   * in "@" then the realm is added
+   */
+  public ACL createACLForUser(UserGroupInformation ugi, int perms) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating ACL For ", new UgiInfo(ugi));
+    }
+    if (!secureRegistry) {
+      return ALL_READWRITE_ACCESS;
+    } else {
+      return createACLfromUsername(ugi.getUserName(), perms);
+    }
+  }
+
+  /**
+   * Given a user name (short or long), create a SASL ACL
+   * @param username user name; if it doesn't contain an "@" symbol, the
+   * service's kerberos realm is added
+   * @param perms permissions
+   * @return an ACL for the user
+   */
+  public ACL createACLfromUsername(String username, int perms) {
+    if (!username.contains("@")) {
+      username = username + "@" + kerberosRealm;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Appending kerberos realm to make {}", username);
+      }
+    }
+    return new ACL(perms, new Id(SCHEME_SASL, username));
+  }
+
+  /**
+   * On demand string-ifier for UGI with extra details
+   */
+  public static class UgiInfo {
+
+    public static UgiInfo fromCurrentUser() {
+      try {
+        return new UgiInfo(UserGroupInformation.getCurrentUser());
+      } catch (IOException e) {
+        LOG.info("Failed to get current user {}", e, e);
+        return new UgiInfo(null);
+      }
+    }
+
+    private final UserGroupInformation ugi;
+
+    public UgiInfo(UserGroupInformation ugi) {
+      this.ugi = ugi;
+    }
+
+    @Override
+    public String toString() {
+      if (ugi==null) {
+        return "(null ugi)";
+      }
+      StringBuilder builder = new StringBuilder();
+      builder.append(ugi.getUserName()).append(": ");
+      builder.append(ugi.toString());
+      builder.append(" hasKerberosCredentials=").append(
+          ugi.hasKerberosCredentials());
+      builder.append(" isFromKeytab=").append(ugi.isFromKeytab());
+      builder.append(" kerberos is enabled in Hadoop =").append(UserGroupInformation.isSecurityEnabled());
+      return builder.toString();
+    }
+
+  }
+
+  /**
+   * on-demand stringifier for a list of ACLs
+   */
+  public static class AclListInfo {
+    public final List<ACL> acls;
+
+    public AclListInfo(List<ACL> acls) {
+      this.acls = acls;
+    }
+
+    @Override
+    public String toString() {
+      return aclsToString(acls);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java
new file mode 100644
index 0000000..3c4a730
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * This class dumps a registry tree to a string.
+ * It does this in the <code>toString()</code> method, so it
+ * can be used in a log statement -the operation
+ * will only take place if the method is evaluated.
+ *
+ */
+@VisibleForTesting
+public class ZKPathDumper {
+
+  public static final int INDENT = 2;
+  private final CuratorFramework curator;
+  private final String root;
+  private final boolean verbose;
+
+  /**
+   * Create a path dumper -but do not dump the path until asked
+   * @param curator curator instance
+   * @param root root
+   * @param verbose verbose flag - includes more details (such as ACLs)
+   */
+  public ZKPathDumper(CuratorFramework curator,
+      String root,
+      boolean verbose) {
+    Preconditions.checkArgument(curator != null);
+    Preconditions.checkArgument(root != null);
+    this.curator = curator;
+    this.root = root;
+    this.verbose = verbose;
+  }
+
+  /**
+   * Trigger the recursive registry dump.
+   * @return a string view of the registry
+   */
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("ZK tree for ").append(root).append('\n');
+    expand(builder, root, 1);
+    return builder.toString();
+  }
+
+  /**
+   * Recursively expand the path into the supplied string builder, increasing
+   * the indentation by {@link #INDENT} as it proceeds (depth first) down
+   * the tree
+   * @param builder string build to append to
+   * @param path path to examine
+   * @param indent current indentation
+   */
+  private void expand(StringBuilder builder,
+      String path,
+      int indent) {
+    try {
+      GetChildrenBuilder childrenBuilder = curator.getChildren();
+      List<String> children = childrenBuilder.forPath(path);
+      for (String child : children) {
+        String childPath = path + "/" + child;
+        String body;
+        Stat stat = curator.checkExists().forPath(childPath);
+        StringBuilder bodyBuilder = new StringBuilder(256);
+        bodyBuilder.append("  [")
+                          .append(stat.getDataLength())
+                          .append("]");
+        if (stat.getEphemeralOwner() > 0) {
+          bodyBuilder.append("*");
+        }
+        if (verbose) {
+          // verbose: extract ACLs
+          builder.append(" -- ");
+          List<ACL> acls =
+              curator.getACL().forPath(childPath);
+          for (ACL acl : acls) {
+            builder.append(RegistrySecurity.aclToString(acl));
+            builder.append(" ");
+          }
+        }
+        body = bodyBuilder.toString();
+        // print each child
+        append(builder, indent, ' ');
+        builder.append('/').append(child);
+        builder.append(body);
+        builder.append('\n');
+        // recurse
+        expand(builder, childPath, indent + INDENT);
+      }
+    } catch (Exception e) {
+      builder.append(e.toString()).append("\n");
+    }
+  }
+
+  /**
+   * Append the specified indentation to a builder
+   * @param builder string build to append to
+   * @param indent current indentation
+   * @param c charactor to use for indentation
+   */
+  private void append(StringBuilder builder, int indent, char c) {
+    for (int i = 0; i < indent; i++) {
+      builder.append(c);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java
new file mode 100644
index 0000000..711e27c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.server.ZooKeeperSaslServer;
+
+/**
+ * Configuration options which are internal to Zookeeper,
+ * as well as some other ZK constants
+ * <p>
+ * Zookeeper options are passed via system properties prior to the ZK
+ * Methods/classes being invoked. This implies that:
+ * <ol>
+ *   <li>There can only be one instance of a ZK client or service class
+ *   in a single JVM —else their configuration options will conflict.</li>
+ *   <li>It is safest to set these properties immediately before
+ *   invoking ZK operations.</li>
+ * </ol>
+ *
+ */
+public interface ZookeeperConfigOptions {
+  /**
+   * Enable SASL secure clients: {@value}.
+   * This is usually set to true, with ZK set to fall back to
+   * non-SASL authentication if the SASL auth fails
+   * by the property
+   * {@link #PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE}.
+   * <p>
+   * As a result, clients will default to attempting SASL-authentication,
+   * but revert to classic authentication/anonymous access on failure.
+   */
+  String PROP_ZK_ENABLE_SASL_CLIENT =
+      ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY;
+
+  /**
+   * Default flag for the ZK client: {@value}.
+   */
+  String DEFAULT_ZK_ENABLE_SASL_CLIENT = "true";
+
+  /**
+   * System property for the JAAS client context : {@value}.
+   *
+   * For SASL authentication to work, this must point to a
+   * context within the
+   *
+   * <p>
+   *   Default value is derived from
+   *   {@link ZooKeeperSaslClient#LOGIN_CONTEXT_NAME_KEY}
+   */
+  String PROP_ZK_SASL_CLIENT_CONTEXT =
+      ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;
+
+  /**
+   * The SASL client username: {@value}.
+   * <p>
+   * Set this to the <i>short</i> name of the client, e.g, "user",
+   * not <code>user/host</code>, or <code>user/host@REALM</code>
+   */
+  String PROP_ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+  /**
+   * The SASL Server context, referring to a context in the JVM's
+   * JAAS context file: {@value}
+   * <p>
+   */
+  String PROP_ZK_SERVER_SASL_CONTEXT =
+      ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY;
+
+  /**
+   * Should ZK Server allow failed SASL clients to downgrade to classic
+   * authentication on a SASL auth failure: {@value}.
+   */
+  String PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE =
+      "zookeeper.maintain_connection_despite_sasl_failure";
+
+  /**
+   * should the ZK Server Allow failed SASL clients: {@value}.
+   */
+  String PROP_ZK_ALLOW_FAILED_SASL_CLIENTS =
+      "zookeeper.allowSaslFailedClients";
+
+  /**
+   * Kerberos realm of the server: {@value}.
+   */
+  String PROP_ZK_SERVER_REALM = "zookeeper.server.realm";
+
+  /**
+   * Path to a kinit binary: {@value}.
+   * Defaults to <code>"/usr/bin/kinit"</code>
+   */
+  String PROP_ZK_KINIT_PATH = "zookeeper.kinit";
+
+  /**
+   * ID scheme for SASL: {@value}.
+   */
+  String SCHEME_SASL = "sasl";
+
+  /**
+   * ID scheme for digest auth: {@value}.
+   */
+  String SCHEME_DIGEST = "digest";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java
new file mode 100644
index 0000000..f7ae983
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Core Zookeeper support.
+ * <p>
+ * This package contains the low-level bindings to Curator and Zookeeper,
+ * including everything related to registry security.
+ * <p>
+ * The class {@link org.apache.hadoop.registry.client.impl.zk.CuratorService}
+ * is a YARN service which offers access to a Zookeeper instance via
+ * Apache Curator.
+ * <p>
+ * The {@link org.apache.hadoop.registry.client.impl.zk.RegistrySecurity}
+ * implements the security support in the registry, though a set of
+ * static methods and as a YARN service.
+ * <p>
+ * To work with ZK, system properties need to be set before invoking
+ * some operations/instantiating some objects. The definitions of these
+ * are kept in {@link org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions}.
+ *
+ *
+ */
+package org.apache.hadoop.registry.client.impl.zk;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
new file mode 100644
index 0000000..192819c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Enum of address types -as integers.
+ * Why integers and not enums? Cross platform serialization as JSON
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface AddressTypes {
+
+  /**
+   * hostname/FQDN and port pair: {@value}.
+   * The host/domain name and port are set as separate strings in the address
+   * list, e.g.
+   * <pre>
+   *   ["namenode.example.org", "50070"]
+   * </pre>
+   */
+  public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
+
+
+  /**
+   * Path <code>/a/b/c</code> style: {@value}.
+   * The entire path is encoded in a single entry
+   *
+   * <pre>
+   *   ["/users/example/dataset"]
+   * </pre>
+   */
+  public static final String ADDRESS_PATH = "path";
+
+
+
+  /**
+   * URI entries: {@value}.
+   * <pre>
+   *   ["http://example.org"]
+   * </pre>
+   */
+  public static final String ADDRESS_URI = "uri";
+
+  /**
+   * Zookeeper addresses as a triple : {@value}.
+   * <p>
+   * These are provide as a 3 element tuple of: hostname, port
+   * and optionally path (depending on the application)
+   * <p>
+   *   A single element would be
+   * <pre>
+   *   ["zk1","2181","/registry"]
+   * </pre>
+   *  An endpoint with multiple elements would list them as
+   * <pre>
+   *   [
+   *    ["zk1","2181","/registry"]
+   *    ["zk2","1600","/registry"]
+   *   ]
+   * </pre>
+   *
+   * the third element in each entry , the path, MUST be the same in each entry.
+   * A client reading the addresses of an endpoint is free to pick any
+   * of the set, so they must be the same.
+   *
+   */
+  public static final String ADDRESS_ZOOKEEPER = "zktriple";
+
+  /**
+   * Any other address: {@value}.
+   */
+  public static final String ADDRESS_OTHER = "";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
new file mode 100644
index 0000000..51418d9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Description of a single service/component endpoint.
+ * It is designed to be marshalled as JSON.
+ * <p>
+ * Every endpoint can have more than one address entry, such as
+ * a list of URLs to a replicated service, or a (hostname, port)
+ * pair. Each of these address entries is represented as a string list,
+ * as that is the only reliably marshallable form of a tuple JSON can represent.
+ *
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public final class Endpoint implements Cloneable {
+
+  /**
+   * API implemented at the end of the binding
+   */
+  public String api;
+
+  /**
+   * Type of address. The standard types are defined in
+   * {@link AddressTypes}
+   */
+  public String addressType;
+
+  /**
+   * Protocol type. Some standard types are defined in
+   * {@link ProtocolTypes}
+   */
+  public String protocolType;
+
+  /**
+   * a list of address tuples —tuples whose format depends on the address type
+   */
+  public List<List<String>> addresses;
+
+  /**
+   * Create an empty instance.
+   */
+  public Endpoint() {
+  }
+
+  /**
+   * Create an endpoint from another endpoint.
+   * This is a deep clone with a new list of addresses.
+   * @param that the endpoint to copy from
+   */
+  public Endpoint(Endpoint that) {
+    this.api = that.api;
+    this.addressType = that.addressType;
+    this.protocolType = that.protocolType;
+    this.addresses = new ArrayList<List<String>>(that.addresses.size());
+    for (List<String> address : addresses) {
+      List<String> addr2 = new ArrayList<String>(address.size());
+      Collections.copy(address, addr2);
+    }
+  }
+
+  /**
+   * Build an endpoint with a list of addresses
+   * @param api API name
+   * @param addressType address type
+   * @param protocolType protocol type
+   * @param addrs addresses
+   */
+  public Endpoint(String api,
+      String addressType,
+      String protocolType,
+      List<List<String>> addrs) {
+    this.api = api;
+    this.addressType = addressType;
+    this.protocolType = protocolType;
+    this.addresses = new ArrayList<List<String>>();
+    if (addrs != null) {
+      addresses.addAll(addrs);
+    }
+  }
+
+  /**
+   * Build an endpoint from a list of URIs; each URI
+   * is ASCII-encoded and added to the list of addresses.
+   * @param api API name
+   * @param protocolType protocol type
+   * @param uris URIs to convert to a list of tup;les
+   */
+  public Endpoint(String api,
+      String protocolType,
+      URI... uris) {
+    this.api = api;
+    this.addressType = AddressTypes.ADDRESS_URI;
+
+    this.protocolType = protocolType;
+    List<List<String>> addrs = new ArrayList<List<String>>(uris.length);
+    for (URI uri : uris) {
+      addrs.add(RegistryTypeUtils.tuple(uri.toString()));
+    }
+    this.addresses = addrs;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("Endpoint{");
+    sb.append("api='").append(api).append('\'');
+    sb.append(", addressType='").append(addressType).append('\'');
+    sb.append(", protocolType='").append(protocolType).append('\'');
+
+    sb.append(", addresses=");
+    if (addresses != null) {
+      sb.append("[ ");
+      for (List<String> address : addresses) {
+        sb.append("[ ");
+        if (address == null) {
+          sb.append("NULL entry in address list");
+        } else {
+          for (String elt : address) {
+            sb.append('"').append(elt).append("\" ");
+          }
+        }
+        sb.append("] ");
+      };
+      sb.append("] ");
+    } else {
+      sb.append("(null) ");
+    }
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Validate the record by checking for null fields and other invalid
+   * conditions
+   * @throws NullPointerException if a field is null when it
+   * MUST be set.
+   * @throws RuntimeException on invalid entries
+   */
+  public void validate() {
+    Preconditions.checkNotNull(api, "null API field");
+    Preconditions.checkNotNull(addressType, "null addressType field");
+    Preconditions.checkNotNull(protocolType, "null protocolType field");
+    Preconditions.checkNotNull(addresses, "null addresses field");
+    for (List<String> address : addresses) {
+      Preconditions.checkNotNull(address, "null element in address");
+    }
+  }
+
+  /**
+   * Shallow clone: the lists of addresses are shared
+   * @return a cloned instance
+   * @throws CloneNotSupportedException
+   */
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
new file mode 100644
index 0000000..f225cf0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * some common protocol types
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProtocolTypes {
+
+  /**
+   * Addresses are URIs of Hadoop Filesystem paths: {@value}.
+   */
+  String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
+
+  /**
+   * Classic Hadoop IPC : {@value}.
+   */
+  String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
+
+  /**
+   * Hadoop protocol buffers IPC: {@value}.
+   */
+  String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
+
+  /**
+   * Corba IIOP: {@value}.
+   */
+  String PROTOCOL_IIOP = "IIOP";
+
+  /**
+   * REST: {@value}.
+   */
+  String PROTOCOL_REST = "REST";
+
+  /**
+   * Java RMI: {@value}.
+   */
+  String PROTOCOL_RMI = "RMI";
+
+  /**
+   * SunOS RPC, as used by NFS and similar: {@value}.
+   */
+  String PROTOCOL_SUN_RPC = "sunrpc";
+
+  /**
+   * Thrift-based protocols: {@value}.
+   */
+  String PROTOCOL_THRIFT = "thrift";
+
+  /**
+   * Custom TCP protocol: {@value}.
+   */
+  String PROTOCOL_TCP = "tcp";
+
+  /**
+   * Custom UPC-based protocol : {@value}.
+   */
+  String PROTOCOL_UDP = "udp";
+
+  /**
+   * Default value —the protocol is unknown : "{@value}"
+   */
+  String PROTOCOL_UNKNOWN = "";
+
+  /**
+   * Web page: {@value}.
+   *
+   * This protocol implies that the URLs are designed for
+   * people to view via web browsers.
+   */
+  String PROTOCOL_WEBUI = "webui";
+
+  /**
+   * Web Services: {@value}.
+   */
+  String PROTOCOL_WSAPI = "WS-*";
+
+  /**
+   * A zookeeper binding: {@value}.
+   */
+  String PROTOCOL_ZOOKEEPER_BINDING = "zookeeper";
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java
new file mode 100644
index 0000000..59bcadc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Output of a <code>RegistryOperations.stat()</code> call
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@JsonIgnoreProperties(ignoreUnknown = true)
+public final class RegistryPathStatus {
+
+  /**
+   * Short path in the registry to this entry
+   */
+  public final String path;
+
+  /**
+   * Timestamp
+   */
+  public final long time;
+
+  /**
+   * Entry size in bytes, as returned by the storage infrastructure.
+   * In zookeeper, even "empty" nodes have a non-zero size.
+   */
+  public final long size;
+
+  /**
+   * Number of child nodes
+   */
+  public final int children;
+
+  /**
+   * Construct an instance
+   * @param path full path
+   * @param time time
+   * @param size entry size
+   * @param children number of children
+   */
+  public RegistryPathStatus(
+      @JsonProperty("path") String path,
+      @JsonProperty("time") long time,
+      @JsonProperty("size") long size,
+      @JsonProperty("children") int children) {
+    this.path = path;
+    this.time = time;
+    this.size = size;
+    this.children = children;
+  }
+
+  /**
+   * Equality operator checks size, time and path of the entries.
+   * It does <i>not</i> check {@link #children}.
+   * @param other the other entry
+   * @return true if the entries are considered equal.
+   */
+  @Override
+  public boolean equals(Object other) {
+    if (this == other) {
+      return true;
+    }
+    if (other == null || getClass() != other.getClass()) {
+      return false;
+    }
+
+    RegistryPathStatus status = (RegistryPathStatus) other;
+
+    if (size != status.size) {
+      return false;
+    }
+    if (time != status.time) {
+      return false;
+    }
+    if (path != null ? !path.equals(status.path) : status.path != null) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * The hash code is derived from the path.
+   * @return hash code for storing the path in maps.
+   */
+  @Override
+  public int hashCode() {
+    return path != null ? path.hashCode() : 0;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb =
+        new StringBuilder("RegistryPathStatus{");
+    sb.append("path='").append(path).append('\'');
+    sb.append(", time=").append(time);
+    sb.append(", size=").append(size);
+    sb.append(", children=").append(children);
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
new file mode 100644
index 0000000..378127f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON-marshallable description of a single component.
+ * It supports the deserialization of unknown attributes, but does
+ * not support their creation.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class ServiceRecord implements Cloneable {
+
+  /**
+   * Description string
+   */
+  public String description;
+
+  /**
+   * map to handle unknown attributes.
+   */
+  private Map<String, String> attributes = new HashMap<String, String>(4);
+
+  /**
+   * List of endpoints intended for use to external callers
+   */
+  public List<Endpoint> external = new ArrayList<Endpoint>();
+
+  /**
+   * List of endpoints for use <i>within</i> an application.
+   */
+  public List<Endpoint> internal = new ArrayList<Endpoint>();
+
+  /**
+   * Create a service record with no ID, description or registration time.
+   * Endpoint lists are set to empty lists.
+   */
+  public ServiceRecord() {
+  }
+
+  /**
+   * Deep cloning constructor
+   * @param that service record source
+   */
+  public ServiceRecord(ServiceRecord that) {
+    this.description = that.description;
+    // others
+    Map<String, String> thatAttrs = that.attributes;
+    for (Map.Entry<String, String> entry : thatAttrs.entrySet()) {
+      attributes.put(entry.getKey(), entry.getValue());
+    }
+    // endpoints
+    List<Endpoint> src = that.internal;
+    if (src != null) {
+      internal = new ArrayList<Endpoint>(src.size());
+      for (Endpoint endpoint : src) {
+        internal.add(new Endpoint(endpoint));
+      }
+    }
+    src = that.external;
+    if (src != null) {
+      external = new ArrayList<Endpoint>(src.size());
+      for (Endpoint endpoint : src) {
+        external.add(new Endpoint(endpoint));
+      }
+    }
+  }
+
+  /**
+   * Add an external endpoint
+   * @param endpoint endpoint to set
+   */
+  public void addExternalEndpoint(Endpoint endpoint) {
+    Preconditions.checkArgument(endpoint != null);
+    endpoint.validate();
+    external.add(endpoint);
+  }
+
+  /**
+   * Add an internal endpoint
+   * @param endpoint endpoint to set
+   */
+  public void addInternalEndpoint(Endpoint endpoint) {
+    Preconditions.checkArgument(endpoint != null);
+    endpoint.validate();
+    internal.add(endpoint);
+  }
+
+  /**
+   * Look up an internal endpoint
+   * @param api API
+   * @return the endpoint or null if there was no match
+   */
+  public Endpoint getInternalEndpoint(String api) {
+    return findByAPI(internal, api);
+  }
+
+  /**
+   * Look up an external endpoint
+   * @param api API
+   * @return the endpoint or null if there was no match
+   */
+  public Endpoint getExternalEndpoint(String api) {
+    return findByAPI(external, api);
+  }
+
+  /**
+   * Handle unknown attributes by storing them in the
+   * {@link #attributes} map
+   * @param key attribute name
+   * @param value attribute value.
+   */
+  @JsonAnySetter
+  public void set(String key, Object value) {
+    attributes.put(key, value.toString());
+  }
+
+  /**
+   * The map of "other" attributes set when parsing. These
+   * are not included in the JSON value of this record when it
+   * is generated.
+   * @return a map of any unknown attributes in the deserialized JSON.
+   */
+  @JsonAnyGetter
+  public Map<String, String> attributes() {
+    return attributes;
+  }
+
+  /**
+   * Get the "other" attribute with a specific key
+   * @param key key to look up
+   * @return the value or null
+   */
+  public String get(String key) {
+    return attributes.get(key);
+  }
+
+  /**
+   * Get the "other" attribute with a specific key.
+   * @param key key to look up
+   * @param defVal default value
+   * @return the value as a string,
+   * or <code>defval</code> if the value was not present
+   */
+  public String get(String key, String defVal) {
+    String val = attributes.get(key);
+    return val != null ? val: defVal;
+  }
+
+  /**
+   * Find an endpoint by its API
+   * @param list list
+   * @param api api name
+   * @return the endpoint or null if there was no match
+   */
+  private Endpoint findByAPI(List<Endpoint> list,  String api) {
+    for (Endpoint endpoint : list) {
+      if (endpoint.api.equals(api)) {
+        return endpoint;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb =
+        new StringBuilder("ServiceRecord{");
+    sb.append("description='").append(description).append('\'');
+    sb.append("; external endpoints: {");
+    for (Endpoint endpoint : external) {
+      sb.append(endpoint).append("; ");
+    }
+    sb.append("}; internal endpoints: {");
+    for (Endpoint endpoint : internal) {
+      sb.append(endpoint != null ? endpoint.toString() : "NULL ENDPOINT");
+      sb.append("; ");
+    }
+    sb.append('}');
+
+    if (!attributes.isEmpty()) {
+      sb.append(", attributes: {");
+      for (Map.Entry<String, String> attr : attributes.entrySet()) {
+        sb.append("\"").append(attr.getKey()).append("\"=\"")
+          .append(attr.getValue()).append("\" ");
+      }
+    } else {
+
+      sb.append(", attributes: {");
+    }
+    sb.append('}');
+
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Shallow clone: all endpoints will be shared across instances
+   * @return a clone of the instance
+   * @throws CloneNotSupportedException
+   */
+  @Override
+  protected Object clone() throws CloneNotSupportedException {
+    return super.clone();
+  }
+
+  /**
+   * Validate the record by checking for null fields and other invalid
+   * conditions
+   * @throws NullPointerException if a field is null when it
+   * MUST be set.
+   * @throws RuntimeException on invalid entries
+   */
+  public void validate() {
+    for (Endpoint endpoint : external) {
+      Preconditions.checkNotNull("null endpoint", endpoint);
+      endpoint.validate();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
new file mode 100644
index 0000000..2f75dba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Service record header; access to the byte array kept private
+ * to avoid findbugs warnings of mutability
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ServiceRecordHeader {
+  /**
+   * Header of a service record:  "jsonservicerec"
+   * By making this over 12 bytes long, we can auto-determine which entries
+   * in a listing are too short to contain a record without getting their data
+   */
+  private static final byte[] RECORD_HEADER = {
+      'j', 's', 'o', 'n',
+      's', 'e', 'r', 'v', 'i', 'c', 'e',
+      'r', 'e', 'c'
+  };
+
+  /**
+   * Get the length of the record header
+   * @return the header length
+   */
+  public static int getLength() {
+    return RECORD_HEADER.length;
+  }
+
+  /**
+   * Get a clone of the record header
+   * @return the new record header.
+   */
+  public static byte[] getData() {
+    byte[] h = new byte[RECORD_HEADER.length];
+    System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
+    return h;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java
new file mode 100644
index 0000000..1c926be
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains all the data types which can be saved to the registry
+ * and/or marshalled to and from JSON.
+ * <p>
+ * The core datatypes, {@link org.apache.hadoop.registry.client.types.ServiceRecord},
+ * and {@link org.apache.hadoop.registry.client.types.Endpoint} are
+ * what is used to describe services and their protocol endpoints in the registry.
+ * <p>
+ * Some adjacent interfaces exist to list attributes of the fields:
+ * <ul>
+ *   <li>{@link org.apache.hadoop.registry.client.types.AddressTypes}</li>
+ *   <li>{@link org.apache.hadoop.registry.client.types.yarn.PersistencePolicies}</li>
+ *   <li>{@link org.apache.hadoop.registry.client.types.ProtocolTypes}</li>
+ * </ul>
+ *
+ * The {@link org.apache.hadoop.registry.client.types.RegistryPathStatus}
+ * class is not saved to the registry —it is the status of a registry
+ * entry that can be retrieved from the API call. It is still
+ * designed to be marshalled to and from JSON, as it can be served up
+ * from REST front ends to the registry.
+ *
+ */
+package org.apache.hadoop.registry.client.types;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java
new file mode 100644
index 0000000..e4c7272
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types.yarn;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+
+/**
+ * Persistence policies for {@link ServiceRecord}
+ */
+
+public interface PersistencePolicies {
+
+  /**
+   * The record persists until removed manually: {@value}.
+   */
+  String PERMANENT = "permanent";
+
+  /**
+   * Remove when the YARN application defined in the id field
+   * terminates: {@value}.
+   */
+  String APPLICATION = "application";
+
+  /**
+   * Remove when the current YARN application attempt ID finishes: {@value}.
+   */
+  String APPLICATION_ATTEMPT = "application-attempt";
+
+  /**
+   * Remove when the YARN container in the ID field finishes: {@value}
+   */
+  String CONTAINER = "container";
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
new file mode 100644
index 0000000..7b78932
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types.yarn;
+
+/**
+ * YARN specific attributes in the registry
+ */
+public class YarnRegistryAttributes {
+
+  /**
+   * ID. For containers: container ID. For application instances, application ID.
+   */
+  public static final String YARN_ID = "yarn:id";
+  public static final String YARN_PERSISTENCE = "yarn:persistence";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
new file mode 100644
index 0000000..e11890f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.integration;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.server.services.DeleteCompletionCallback;
+import org.apache.hadoop.registry.server.services.RegistryAdminService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+
+/**
+ * Handle RM events by updating the registry
+ * <p>
+ * These actions are all implemented as event handlers to operations
+ * which come from the RM.
+ * <p>
+ * This service is expected to be executed by a user with the permissions
+ * to manipulate the entire registry,
+ */
+@InterfaceAudience.LimitedPrivate("YARN")
+@InterfaceStability.Evolving
+public class RMRegistryOperationsService extends RegistryAdminService {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RMRegistryOperationsService.class);
+
+  private PurgePolicy purgeOnCompletionPolicy = PurgePolicy.PurgeAll;
+
+  public RMRegistryOperationsService(String name) {
+    this(name, null);
+  }
+
+  public RMRegistryOperationsService(String name,
+      RegistryBindingSource bindingSource) {
+    super(name, bindingSource);
+  }
+
+
+  /**
+   * Extend the parent service initialization by verifying that the
+   * service knows —in a secure cluster— the realm in which it is executing.
+   * It needs this to properly build up the user names and hence their
+   * access rights.
+   *
+   * @param conf configuration of the service
+   * @throws Exception
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+
+    verifyRealmValidity();
+  }
+
+  public PurgePolicy getPurgeOnCompletionPolicy() {
+    return purgeOnCompletionPolicy;
+  }
+
+  public void setPurgeOnCompletionPolicy(PurgePolicy purgeOnCompletionPolicy) {
+    this.purgeOnCompletionPolicy = purgeOnCompletionPolicy;
+  }
+
+  public void onApplicationAttemptRegistered(ApplicationAttemptId attemptId,
+      String host, int rpcport, String trackingurl) throws IOException {
+
+  }
+
+  public void onApplicationLaunched(ApplicationId id) throws IOException {
+
+  }
+
+  /**
+   * Actions to take as an AM registers itself with the RM.
+   * @param attemptId attempt ID
+   * @throws IOException problems
+   */
+  public void onApplicationMasterRegistered(ApplicationAttemptId attemptId) throws
+      IOException {
+  }
+
+  /**
+   * Actions to take when the AM container is completed
+   * @param containerId  container ID
+   * @throws IOException problems
+   */
+  public void onAMContainerFinished(ContainerId containerId) throws
+      IOException {
+    LOG.info("AM Container {} finished, purging application attempt records",
+        containerId);
+
+    // remove all application attempt entries
+    purgeAppAttemptRecords(containerId.getApplicationAttemptId());
+
+    // also treat as a container finish to remove container
+    // level records for the AM container
+    onContainerFinished(containerId);
+  }
+
+  /**
+   * remove all application attempt entries
+   * @param attemptId attempt ID
+   */
+  protected void purgeAppAttemptRecords(ApplicationAttemptId attemptId) {
+    purgeRecordsAsync("/",
+        attemptId.toString(),
+        PersistencePolicies.APPLICATION_ATTEMPT);
+  }
+
+  /**
+   * Actions to take when an application attempt is completed
+   * @param attemptId  application  ID
+   * @throws IOException problems
+   */
+  public void onApplicationAttemptUnregistered(ApplicationAttemptId attemptId)
+      throws IOException {
+    LOG.info("Application attempt {} unregistered, purging app attempt records",
+        attemptId);
+    purgeAppAttemptRecords(attemptId);
+  }
+
+  /**
+   * Actions to take when an application is completed
+   * @param id  application  ID
+   * @throws IOException problems
+   */
+  public void onApplicationCompleted(ApplicationId id)
+      throws IOException {
+    LOG.info("Application {} completed, purging application-level records",
+        id);
+    purgeRecordsAsync("/",
+        id.toString(),
+        PersistencePolicies.APPLICATION);
+  }
+
+  public void onApplicationAttemptAdded(ApplicationAttemptId appAttemptId) {
+  }
+
+  /**
+   * This is the event where the user is known, so the user directory
+   * can be created
+   * @param applicationId application  ID
+   * @param user username
+   * @throws IOException problems
+   */
+  public void onStateStoreEvent(ApplicationId applicationId, String user) throws
+      IOException {
+    initUserRegistryAsync(user);
+  }
+
+  /**
+   * Actions to take when the AM container is completed
+   * @param id  container ID
+   * @throws IOException problems
+   */
+  public void onContainerFinished(ContainerId id) throws IOException {
+    LOG.info("Container {} finished, purging container-level records",
+        id);
+    purgeRecordsAsync("/",
+        id.toString(),
+        PersistencePolicies.CONTAINER);
+  }
+
+  /**
+   * Queue an async operation to purge all matching records under a base path.
+   * <ol>
+   *   <li>Uses a depth first search</li>
+   *   <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
+   *   <li>If a record matches then it is deleted without any child searches</li>
+   *   <li>Deletions will be asynchronous if a callback is provided</li>
+   * </ol>
+   * @param path base path
+   * @param id ID for service record.id
+   * @param persistencePolicyMatch ID for the persistence policy to match:
+   * no match, no delete.
+   * @return a future that returns the #of records deleted
+   */
+  @VisibleForTesting
+  public Future<Integer> purgeRecordsAsync(String path,
+      String id,
+      String persistencePolicyMatch) {
+
+    return purgeRecordsAsync(path,
+        id, persistencePolicyMatch,
+        purgeOnCompletionPolicy,
+        new DeleteCompletionCallback());
+  }
+
+  /**
+   * Queue an async operation to purge all matching records under a base path.
+   * <ol>
+   *   <li>Uses a depth first search</li>
+   *   <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
+   *   <li>If a record matches then it is deleted without any child searches</li>
+   *   <li>Deletions will be asynchronous if a callback is provided</li>
+   * </ol>
+   * @param path base path
+   * @param id ID for service record.id
+   * @param persistencePolicyMatch ID for the persistence policy to match:
+   * no match, no delete.
+   * @param purgePolicy how to react to children under the entry
+   * @param callback an optional callback
+   * @return a future that returns the #of records deleted
+   */
+  @VisibleForTesting
+  public Future<Integer> purgeRecordsAsync(String path,
+      String id,
+      String persistencePolicyMatch,
+      PurgePolicy purgePolicy,
+      BackgroundCallback callback) {
+    LOG.info(" records under {} with ID {} and policy {}: {}",
+        path, id, persistencePolicyMatch);
+    return submit(
+        new AsyncPurge(path,
+            new SelectByYarnPersistence(id, persistencePolicyMatch),
+            purgePolicy,
+            callback));
+  }
+
+}