You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2014/10/08 21:54:51 UTC
[4/6] YARN-913 service registry: YARN-2652 add hadoop-yarn-registry
package under hadoop-yarn
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
new file mode 100644
index 0000000..6484d28
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistrySecurity.java
@@ -0,0 +1,996 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Locale;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions.*;
+import static org.apache.hadoop.registry.client.api.RegistryConstants.*;
+
+/**
+ * Implement the registry security ... a self contained service for
+ * testability.
+ * <p>
+ * This class contains:
+ * <ol>
+ * <li>
+ * The registry security policy implementation, configuration reading, ACL
+ * setup and management
+ * </li>
+ * <li>Lots of static helper methods to aid security setup and debugging</li>
+ * </ol>
+ */
+
+public class RegistrySecurity extends AbstractService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RegistrySecurity.class);
+
+ public static final String E_UNKNOWN_AUTHENTICATION_MECHANISM =
+ "Unknown/unsupported authentication mechanism; ";
+
+ /**
+ * there's no default user to add with permissions, so it would be
+ * impossible to create nodes with unrestricted user access
+ */
+ public static final String E_NO_USER_DETERMINED_FOR_ACLS =
+ "No user for ACLs determinable from current user or registry option "
+ + KEY_REGISTRY_USER_ACCOUNTS;
+
+ /**
+ * Error raised when the registry is tagged as secure but this
+ * process doesn't have hadoop security enabled.
+ */
+ public static final String E_NO_KERBEROS =
+ "Registry security is enabled -but Hadoop security is not enabled";
+
+ /**
+ * Access policy options
+ */
+ private enum AccessPolicy {
+ anon, sasl, digest
+ }
+
+ /**
+ * Access mechanism
+ */
+ private AccessPolicy access;
+
+ /**
+ * User used for digest auth
+ */
+
+ private String digestAuthUser;
+
+ /**
+ * Password used for digest auth
+ */
+
+ private String digestAuthPassword;
+
+ /**
+ * Auth data used for digest auth
+ */
+ private byte[] digestAuthData;
+
+ /**
+ * flag set to true if the registry has security enabled.
+ */
+ private boolean secureRegistry;
+
+ /**
+ * An ACL with read-write access for anyone
+ */
+ public static final ACL ALL_READWRITE_ACCESS =
+ new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);
+
+ /**
+ * An ACL with read access for anyone
+ */
+ public static final ACL ALL_READ_ACCESS =
+ new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE);
+
+ /**
+ * An ACL list containing the {@link #ALL_READWRITE_ACCESS} entry.
+ * It is copy on write so can be shared without worry
+ */
+ public static final List<ACL> WorldReadWriteACL;
+
+ static {
+ List<ACL> acls = new ArrayList<ACL>();
+ acls.add(ALL_READWRITE_ACCESS);
+ WorldReadWriteACL = new CopyOnWriteArrayList<ACL>(acls);
+ }
+
+ /**
+ * the list of system ACLs
+ */
+ private final List<ACL> systemACLs = new ArrayList<ACL>();
+
+ /**
+ * A list of digest ACLs which can be added to permissions
+ * —and cleared later.
+ */
+ private final List<ACL> digestACLs = new ArrayList<ACL>();
+
+ /**
+ * the default kerberos realm
+ */
+ private String kerberosRealm;
+
+ /**
+ * Client context
+ */
+ private String jaasClientContext;
+
+ /**
+ * Client identity
+ */
+ private String jaasClientIdentity;
+
+ /**
+ * Create an instance
+ * @param name service name
+ */
+ public RegistrySecurity(String name) {
+ super(name);
+ }
+
+ /**
+ * Init the service: this sets up security based on the configuration
+ * @param conf configuration
+ * @throws Exception
+ */
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ String auth = conf.getTrimmed(KEY_REGISTRY_CLIENT_AUTH,
+ REGISTRY_CLIENT_AUTH_ANONYMOUS);
+
+ // TODO JDK7 SWITCH
+ if (REGISTRY_CLIENT_AUTH_KERBEROS.equals(auth)) {
+ access = AccessPolicy.sasl;
+ } else if (REGISTRY_CLIENT_AUTH_DIGEST.equals(auth)) {
+ access = AccessPolicy.digest;
+ } else if (REGISTRY_CLIENT_AUTH_ANONYMOUS.equals(auth)) {
+ access = AccessPolicy.anon;
+ } else {
+ throw new ServiceStateException(E_UNKNOWN_AUTHENTICATION_MECHANISM
+ + "\"" + auth + "\"");
+ }
+ initSecurity();
+ }
+
+ /**
+ * Init security.
+ *
+ * After this operation, the {@link #systemACLs} list is valid.
+ * @throws IOException
+ */
+ private void initSecurity() throws IOException {
+
+ secureRegistry =
+ getConfig().getBoolean(KEY_REGISTRY_SECURE, DEFAULT_REGISTRY_SECURE);
+ systemACLs.clear();
+ if (secureRegistry) {
+ addSystemACL(ALL_READ_ACCESS);
+
+ // determine the kerberos realm from JVM and settings
+ kerberosRealm = getConfig().get(KEY_REGISTRY_KERBEROS_REALM,
+ getDefaultRealmInJVM());
+
+ // System Accounts
+ String system = getOrFail(KEY_REGISTRY_SYSTEM_ACCOUNTS,
+ DEFAULT_REGISTRY_SYSTEM_ACCOUNTS);
+
+ systemACLs.addAll(buildACLs(system, kerberosRealm, ZooDefs.Perms.ALL));
+
+ // user accounts (may be empty, but for digest one user AC must
+ // be built up
+ String user = getConfig().get(KEY_REGISTRY_USER_ACCOUNTS,
+ DEFAULT_REGISTRY_USER_ACCOUNTS);
+ List<ACL> userACLs = buildACLs(user, kerberosRealm, ZooDefs.Perms.ALL);
+
+ // add self if the current user can be determined
+ ACL self;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ self = createSaslACLFromCurrentUser(ZooDefs.Perms.ALL);
+ if (self != null) {
+ userACLs.add(self);
+ }
+ }
+
+ // here check for UGI having secure on or digest + ID
+ switch (access) {
+ case sasl:
+ // secure + SASL => has to be authenticated
+ if (!UserGroupInformation.isSecurityEnabled()) {
+ throw new IOException("Kerberos required for secure registry access");
+ }
+ UserGroupInformation currentUser =
+ UserGroupInformation.getCurrentUser();
+ jaasClientContext = getOrFail(KEY_REGISTRY_CLIENT_JAAS_CONTEXT,
+ DEFAULT_REGISTRY_CLIENT_JAAS_CONTEXT);
+ jaasClientIdentity = currentUser.getShortUserName();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Auth is SASL user=\"{}\" JAAS context=\"{}\"",
+ jaasClientIdentity,
+ jaasClientContext);
+ }
+ break;
+
+ case digest:
+ String id = getOrFail(KEY_REGISTRY_CLIENT_AUTHENTICATION_ID, "");
+ String pass = getOrFail(KEY_REGISTRY_CLIENT_AUTHENTICATION_PASSWORD, "");
+ if (userACLs.isEmpty()) {
+ //
+ throw new ServiceStateException(E_NO_USER_DETERMINED_FOR_ACLS);
+ }
+ digest(id, pass);
+ ACL acl = new ACL(ZooDefs.Perms.ALL, toDigestId(id, pass));
+ userACLs.add(acl);
+ digestAuthUser = id;
+ digestAuthPassword = pass;
+ String authPair = id + ":" + pass;
+ digestAuthData = authPair.getBytes("UTF-8");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Auth is Digest ACL: {}", aclToString(acl));
+ }
+ break;
+
+ case anon:
+ // nothing is needed; account is read only.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Auth is anonymous");
+ }
+ userACLs = new ArrayList<ACL>(0);
+ break;
+ }
+ systemACLs.addAll(userACLs);
+
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Registry has no security");
+ }
+ // wide open cluster, adding system acls
+ systemACLs.addAll(WorldReadWriteACL);
+ }
+ }
+
+ /**
+ * Add another system ACL
+ * @param acl add ACL
+ */
+ public void addSystemACL(ACL acl) {
+ systemACLs.add(acl);
+ }
+
+ /**
+ * Add a digest ACL
+ * @param acl add ACL
+ */
+ public boolean addDigestACL(ACL acl) {
+ if (secureRegistry) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added ACL {}", aclToString(acl));
+ }
+ digestACLs.add(acl);
+ return true;
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring added ACL - registry is insecure{}",
+ aclToString(acl));
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Reset the digest ACL list
+ */
+ public void resetDigestACLs() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Cleared digest ACLs");
+ }
+ digestACLs.clear();
+ }
+
+ /**
+ * Flag to indicate the cluster is secure
+ * @return true if the config enabled security
+ */
+ public boolean isSecureRegistry() {
+ return secureRegistry;
+ }
+
+ /**
+ * Get the system principals
+ * @return the system principals
+ */
+ public List<ACL> getSystemACLs() {
+ Preconditions.checkNotNull(systemACLs, "registry security is unitialized");
+ return Collections.unmodifiableList(systemACLs);
+ }
+
+ /**
+ * Get all ACLs needed for a client to use when writing to the repo.
+ * That is: system ACLs, its own ACL, any digest ACLs
+ * @return the client ACLs
+ */
+ public List<ACL> getClientACLs() {
+ List<ACL> clientACLs = new ArrayList<ACL>(systemACLs);
+ clientACLs.addAll(digestACLs);
+ return clientACLs;
+ }
+
+ /**
+ * Create a SASL ACL for the user
+ * @param perms permissions
+ * @return an ACL for the current user or null if they aren't a kerberos user
+ * @throws IOException
+ */
+ public ACL createSaslACLFromCurrentUser(int perms) throws IOException {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ if (currentUser.hasKerberosCredentials()) {
+ return createSaslACL(currentUser, perms);
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * Given a UGI, create a SASL ACL from it
+ * @param ugi UGI
+ * @param perms permissions
+ * @return a new ACL
+ */
+ public ACL createSaslACL(UserGroupInformation ugi, int perms) {
+ String userName = ugi.getUserName();
+ return new ACL(perms, new Id(SCHEME_SASL, userName));
+ }
+
+ /**
+ * Get a conf option, throw an exception if it is null/empty
+ * @param key key
+ * @param defval default value
+ * @return the value
+ * @throws IOException if missing
+ */
+ private String getOrFail(String key, String defval) throws IOException {
+ String val = getConfig().get(key, defval);
+ if (StringUtils.isEmpty(val)) {
+ throw new IOException("Missing value for configuration option " + key);
+ }
+ return val;
+ }
+
+ /**
+ * Check for an id:password tuple being valid.
+ * This test is stricter than that in {@link DigestAuthenticationProvider},
+ * which splits the string, but doesn't check the contents of each
+ * half for being non-"".
+ * @param idPasswordPair id:pass pair
+ * @return true if the pass is considered valid.
+ */
+ public boolean isValid(String idPasswordPair) {
+ String[] parts = idPasswordPair.split(":");
+ return parts.length == 2
+ && !StringUtils.isEmpty(parts[0])
+ && !StringUtils.isEmpty(parts[1]);
+ }
+
+ /**
+ * Get the derived kerberos realm.
+ * @return this is built from the JVM realm, or the configuration if it
+ * overrides it. If "", it means "don't know".
+ */
+ public String getKerberosRealm() {
+ return kerberosRealm;
+ }
+
+ /**
+ * Generate a base-64 encoded digest of the idPasswordPair pair
+ * @param idPasswordPair id:password
+ * @return a string that can be used for authentication
+ */
+ public String digest(String idPasswordPair) throws IOException {
+ if (StringUtils.isEmpty(idPasswordPair) || !isValid(idPasswordPair)) {
+ throw new IOException("Invalid id:password: " + idPasswordPair);
+ }
+ try {
+ return DigestAuthenticationProvider.generateDigest(idPasswordPair);
+ } catch (NoSuchAlgorithmException e) {
+ // unlikely since it is standard to the JVM, but maybe JCE restrictions
+ // could trigger it
+ throw new IOException(e.toString(), e);
+ }
+ }
+
+ /**
+ * Generate a base-64 encoded digest of the idPasswordPair pair
+ * @param id ID
+ * @param password pass
+ * @return a string that can be used for authentication
+ * @throws IOException
+ */
+ public String digest(String id, String password) throws IOException {
+ return digest(id + ":" + password);
+ }
+
+ /**
+ * Given a digest, create an ID from it
+ * @param digest digest
+ * @return ID
+ */
+ public Id toDigestId(String digest) {
+ return new Id(SCHEME_DIGEST, digest);
+ }
+
+ /**
+ * Create a Digest ID from an id:pass pair
+ * @param id ID
+ * @param password password
+ * @return an ID
+ * @throws IOException
+ */
+ public Id toDigestId(String id, String password) throws IOException {
+ return toDigestId(digest(id, password));
+ }
+
+ /**
+ * Split up a list of the form
+ * <code>sasl:mapred@,digest:5f55d66, sasl@yarn@EXAMPLE.COM</code>
+ * into a list of possible ACL values, trimming as needed
+ *
+ * The supplied realm is added to entries where
+ * <ol>
+ * <li>the string begins "sasl:"</li>
+ * <li>the string ends with "@"</li>
+ * </ol>
+ * No attempt is made to validate any of the acl patterns.
+ *
+ * @param aclString list of 0 or more ACLs
+ * @param realm realm to add
+ * @return a list of split and potentially patched ACL pairs.
+ *
+ */
+ public List<String> splitAclPairs(String aclString, String realm) {
+ List<String> list = Lists.newArrayList(
+ Splitter.on(',').omitEmptyStrings().trimResults()
+ .split(aclString));
+ ListIterator<String> listIterator = list.listIterator();
+ while (listIterator.hasNext()) {
+ String next = listIterator.next();
+ if (next.startsWith(SCHEME_SASL +":") && next.endsWith("@")) {
+ listIterator.set(next + realm);
+ }
+ }
+ return list;
+ }
+
+ /**
+ * Parse a string down to an ID, adding a realm if needed
+ * @param idPair id:data tuple
+ * @param realm realm to add
+ * @return the ID.
+ * @throws IllegalArgumentException if the idPair is invalid
+ */
+ public Id parse(String idPair, String realm) {
+ int firstColon = idPair.indexOf(':');
+ int lastColon = idPair.lastIndexOf(':');
+ if (firstColon == -1 || lastColon == -1 || firstColon != lastColon) {
+ throw new IllegalArgumentException(
+ "ACL '" + idPair + "' not of expected form scheme:id");
+ }
+ String scheme = idPair.substring(0, firstColon);
+ String id = idPair.substring(firstColon + 1);
+ if (id.endsWith("@")) {
+ Preconditions.checkArgument(
+ StringUtils.isNotEmpty(realm),
+ "@ suffixed account but no realm %s", id);
+ id = id + realm;
+ }
+ return new Id(scheme, id);
+ }
+
+ /**
+ * Parse the IDs, adding a realm if needed, setting the permissions
+ * @param principalList id string
+ * @param realm realm to add
+ * @param perms permissions
+ * @return the relevant ACLs
+ * @throws IOException
+ */
+ public List<ACL> buildACLs(String principalList, String realm, int perms)
+ throws IOException {
+ List<String> aclPairs = splitAclPairs(principalList, realm);
+ List<ACL> ids = new ArrayList<ACL>(aclPairs.size());
+ for (String aclPair : aclPairs) {
+ ACL newAcl = new ACL();
+ newAcl.setId(parse(aclPair, realm));
+ newAcl.setPerms(perms);
+ ids.add(newAcl);
+ }
+ return ids;
+ }
+
+ /**
+ * Parse an ACL list. This includes configuration indirection
+ * {@link ZKUtil#resolveConfIndirection(String)}
+ * @param zkAclConf configuration string
+ * @return an ACL list
+ * @throws IOException on a bad ACL parse
+ */
+ public List<ACL> parseACLs(String zkAclConf) throws IOException {
+ try {
+ return ZKUtil.parseACLs(ZKUtil.resolveConfIndirection(zkAclConf));
+ } catch (ZKUtil.BadAclFormatException e) {
+ throw new IOException("Parsing " + zkAclConf + " :" + e, e);
+ }
+ }
+
+ /**
+ * Get the appropriate Kerberos Auth module for JAAS entries
+ * for this JVM.
+ * @return a JVM-specific kerberos login module classname.
+ */
+ public static String getKerberosAuthModuleForJVM() {
+ if (System.getProperty("java.vendor").contains("IBM")) {
+ return "com.ibm.security.auth.module.Krb5LoginModule";
+ } else {
+ return "com.sun.security.auth.module.Krb5LoginModule";
+ }
+ }
+
+ /**
+ * JAAS template: {@value}
+ * Note the semicolon on the last entry
+ */
+ private static final String JAAS_ENTRY =
+ "%s { \n"
+ + " %s required\n"
+ // kerberos module
+ + " keyTab=\"%s\"\n"
+ + " principal=\"%s\"\n"
+ + " useKeyTab=true\n"
+ + " useTicketCache=false\n"
+ + " doNotPrompt=true\n"
+ + " storeKey=true;\n"
+ + "}; \n"
+ ;
+
+ /**
+ * Create a JAAS entry for insertion
+ * @param context context of the entry
+ * @param principal kerberos principal
+ * @param keytab keytab
+ * @return a context
+ */
+ public String createJAASEntry(
+ String context,
+ String principal,
+ File keytab) {
+ Preconditions.checkArgument(StringUtils.isNotEmpty(principal),
+ "invalid principal");
+ Preconditions.checkArgument(StringUtils.isNotEmpty(context),
+ "invalid context");
+ Preconditions.checkArgument(keytab != null && keytab.isFile(),
+ "Keytab null or missing: ");
+ return String.format(
+ Locale.ENGLISH,
+ JAAS_ENTRY,
+ context,
+ getKerberosAuthModuleForJVM(),
+ keytab.getAbsolutePath(),
+ principal);
+ }
+
+ /**
+ * Bind the JVM JAS setting to the specified JAAS file.
+ *
+ * <b>Important:</b> once a file has been loaded the JVM doesn't pick up
+ * changes
+ * @param jaasFile the JAAS file
+ */
+ public static void bindJVMtoJAASFile(File jaasFile) {
+ String path = jaasFile.getAbsolutePath();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Binding {} to {}", Environment.JAAS_CONF_KEY, path);
+ }
+ System.setProperty(Environment.JAAS_CONF_KEY, path);
+ }
+
+ /**
+ * Set the Zookeeper server property
+ * {@link ZookeeperConfigOptions#PROP_ZK_SERVER_SASL_CONTEXT}
+ * to the SASL context. When the ZK server starts, this is the context
+ * which it will read in
+ * @param contextName the name of the context
+ */
+ public static void bindZKToServerJAASContext(String contextName) {
+ System.setProperty(PROP_ZK_SERVER_SASL_CONTEXT, contextName);
+ }
+
+ /**
+ * Reset any system properties related to JAAS
+ */
+ public static void clearJaasSystemProperties() {
+ System.clearProperty(Environment.JAAS_CONF_KEY);
+ }
+
+ /**
+ * Resolve the context of an entry. This is an effective test of
+ * JAAS setup, because it will relay detected problems up
+ * @param context context name
+ * @return the entry
+ * @throws RuntimeException if there is no context entry found
+ */
+ public static AppConfigurationEntry[] validateContext(String context) {
+ if (context == null) {
+ throw new RuntimeException("Null context argument");
+ }
+ if (context.isEmpty()) {
+ throw new RuntimeException("Empty context argument");
+ }
+ javax.security.auth.login.Configuration configuration =
+ javax.security.auth.login.Configuration.getConfiguration();
+ AppConfigurationEntry[] entries =
+ configuration.getAppConfigurationEntry(context);
+ if (entries == null) {
+ throw new RuntimeException(
+ String.format("Entry \"%s\" not found; " +
+ "JAAS config = %s",
+ context,
+ describeProperty(Environment.JAAS_CONF_KEY) ));
+ }
+ return entries;
+ }
+
+ /**
+ * Apply the security environment to this curator instance. This
+ * may include setting up the ZK system properties for SASL
+ * @param builder curator builder
+ */
+ public void applySecurityEnvironment(CuratorFrameworkFactory.Builder builder) {
+
+ if (isSecureRegistry()) {
+ switch (access) {
+ case anon:
+ clearZKSaslClientProperties();
+ break;
+
+ case digest:
+ // no SASL
+ clearZKSaslClientProperties();
+ builder.authorization(SCHEME_DIGEST, digestAuthData);
+ break;
+
+ case sasl:
+ // bind to the current identity and context within the JAAS file
+ setZKSaslClientProperties(jaasClientIdentity, jaasClientContext);
+ }
+ }
+ }
+
+ /**
+ * Set the client properties. This forces the ZK client into
+ * failing if it can't auth.
+ * <b>Important:</b>This is JVM-wide.
+ * @param username username
+ * @param context login context
+ * @throws RuntimeException if the context cannot be found in the current
+ * JAAS context
+ */
+ public static void setZKSaslClientProperties(String username,
+ String context) {
+ RegistrySecurity.validateContext(context);
+ enableZookeeperClientSASL();
+ System.setProperty(PROP_ZK_SASL_CLIENT_USERNAME, username);
+ System.setProperty(PROP_ZK_SASL_CLIENT_CONTEXT, context);
+ }
+
+ /**
+ * Clear all the ZK SASL Client properties
+ * <b>Important:</b>This is JVM-wide
+ */
+ public static void clearZKSaslClientProperties() {
+ disableZookeeperClientSASL();
+ System.clearProperty(PROP_ZK_SASL_CLIENT_CONTEXT);
+ System.clearProperty(PROP_ZK_SASL_CLIENT_USERNAME);
+ }
+
+ /**
+ * Turn ZK SASL on
+ * <b>Important:</b>This is JVM-wide
+ */
+ protected static void enableZookeeperClientSASL() {
+ System.setProperty(PROP_ZK_ENABLE_SASL_CLIENT, "true");
+ }
+
+ /**
+ * Force disable ZK SASL bindings.
+ * <b>Important:</b>This is JVM-wide
+ */
+ public static void disableZookeeperClientSASL() {
+ System.setProperty(ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY, "false");
+ }
+
+ /**
+ * Is the system property enabling the SASL client set?
+ * @return true if the SASL client system property is set.
+ */
+ public static boolean isClientSASLEnabled() {
+ return ZooKeeperSaslClient.isEnabled();
+ }
+
+ /**
+ * Log details about the current Hadoop user at INFO.
+ * Robust against IOEs when trying to get the current user
+ */
+ public void logCurrentHadoopUser() {
+ try {
+ UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+ LOG.info("Current user = {}",currentUser);
+ UserGroupInformation realUser = currentUser.getRealUser();
+ LOG.info("Real User = {}" , realUser);
+ } catch (IOException e) {
+ LOG.warn("Failed to get current user {}, {}", e);
+ }
+ }
+
+ /**
+ * Stringify a list of ACLs for logging. Digest ACLs have their
+ * digest values stripped for security.
+ * @param acls ACL list
+ * @return a string for logs, exceptions, ...
+ */
+ public static String aclsToString(List<ACL> acls) {
+ StringBuilder builder = new StringBuilder();
+ if (acls == null) {
+ builder.append("null ACL");
+ } else {
+ builder.append('\n');
+ for (ACL acl : acls) {
+ builder.append(aclToString(acl))
+ .append(" ");
+ }
+ }
+ return builder.toString();
+ }
+
+ /**
+ * Convert an ACL to a string, with any obfuscation needed
+ * @param acl ACL
+ * @return ACL string value
+ */
+ public static String aclToString(ACL acl) {
+ return String.format(Locale.ENGLISH,
+ "0x%02x: %s",
+ acl.getPerms(),
+ idToString(acl.getId())
+ );
+ }
+
+ /**
+ * Convert an ID to a string, stripping out all but the first few characters
+ * of any digest auth hash for security reasons
+ * @param id ID
+ * @return a string description of a Zookeeper ID
+ */
+ public static String idToString(Id id) {
+ String s;
+ if (id.getScheme().equals(SCHEME_DIGEST)) {
+ String ids = id.getId();
+ int colon = ids.indexOf(':');
+ if (colon > 0) {
+ ids = ids.substring(colon + 3);
+ }
+ s = SCHEME_DIGEST + ": " + ids;
+ } else {
+ s = id.toString();
+ }
+ return s;
+ }
+
+ /**
+ * Build up low-level security diagnostics to aid debugging
+ * @return a string to use in diagnostics
+ */
+ public String buildSecurityDiagnostics() {
+ StringBuilder builder = new StringBuilder();
+ builder.append(secureRegistry ? "secure registry; "
+ : "insecure registry; ");
+ builder.append("Access policy: ").append(access);
+
+ builder.append(", System ACLs: ").append(aclsToString(systemACLs));
+ builder.append(UgiInfo.fromCurrentUser());
+ builder.append(" Kerberos Realm: ").append(kerberosRealm).append(" ; ");
+ builder.append(describeProperty(Environment.JAAS_CONF_KEY));
+ String sasl =
+ System.getProperty(PROP_ZK_ENABLE_SASL_CLIENT,
+ DEFAULT_ZK_ENABLE_SASL_CLIENT);
+ boolean saslEnabled = Boolean.valueOf(sasl);
+ builder.append(describeProperty(PROP_ZK_ENABLE_SASL_CLIENT,
+ DEFAULT_ZK_ENABLE_SASL_CLIENT));
+ if (saslEnabled) {
+ builder.append("JAAS Client Identity")
+ .append("=")
+ .append(jaasClientIdentity)
+ .append("; ");
+ builder.append(KEY_REGISTRY_CLIENT_JAAS_CONTEXT)
+ .append("=")
+ .append(jaasClientContext)
+ .append("; ");
+ builder.append(describeProperty(PROP_ZK_SASL_CLIENT_USERNAME));
+ builder.append(describeProperty(PROP_ZK_SASL_CLIENT_CONTEXT));
+ }
+ builder.append(describeProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS,
+ "(undefined but defaults to true)"));
+ builder.append(describeProperty(
+ PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE));
+ return builder.toString();
+ }
+
+ private static String describeProperty(String name) {
+ return describeProperty(name, "(undefined)");
+ }
+
+ private static String describeProperty(String name, String def) {
+ return "; " + name + "=" + System.getProperty(name, def);
+ }
+
+ /**
+ * Get the default kerberos realm —returning "" if there
+ * is no realm or other problem
+ * @return the default realm of the system if it
+ * could be determined
+ */
+ public static String getDefaultRealmInJVM() {
+ try {
+ return KerberosUtil.getDefaultRealm();
+ // JDK7
+ } catch (ClassNotFoundException ignored) {
+ // ignored
+ } catch (NoSuchMethodException ignored) {
+ // ignored
+ } catch (IllegalAccessException ignored) {
+ // ignored
+ } catch (InvocationTargetException ignored) {
+ // ignored
+ }
+ return "";
+ }
+
+ /**
+ * Create an ACL For a user.
+ * @param ugi User identity
+ * @return the ACL For the specified user. Ifthe username doesn't end
+ * in "@" then the realm is added
+ */
+ public ACL createACLForUser(UserGroupInformation ugi, int perms) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating ACL For ", new UgiInfo(ugi));
+ }
+ if (!secureRegistry) {
+ return ALL_READWRITE_ACCESS;
+ } else {
+ return createACLfromUsername(ugi.getUserName(), perms);
+ }
+ }
+
+ /**
+ * Given a user name (short or long), create a SASL ACL
+ * @param username user name; if it doesn't contain an "@" symbol, the
+ * service's kerberos realm is added
+ * @param perms permissions
+ * @return an ACL for the user
+ */
+ public ACL createACLfromUsername(String username, int perms) {
+ if (!username.contains("@")) {
+ username = username + "@" + kerberosRealm;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Appending kerberos realm to make {}", username);
+ }
+ }
+ return new ACL(perms, new Id(SCHEME_SASL, username));
+ }
+
+ /**
+ * On demand string-ifier for UGI with extra details
+ */
+ public static class UgiInfo {
+
+ public static UgiInfo fromCurrentUser() {
+ try {
+ return new UgiInfo(UserGroupInformation.getCurrentUser());
+ } catch (IOException e) {
+ LOG.info("Failed to get current user {}", e, e);
+ return new UgiInfo(null);
+ }
+ }
+
+ private final UserGroupInformation ugi;
+
+ public UgiInfo(UserGroupInformation ugi) {
+ this.ugi = ugi;
+ }
+
+ @Override
+ public String toString() {
+ if (ugi==null) {
+ return "(null ugi)";
+ }
+ StringBuilder builder = new StringBuilder();
+ builder.append(ugi.getUserName()).append(": ");
+ builder.append(ugi.toString());
+ builder.append(" hasKerberosCredentials=").append(
+ ugi.hasKerberosCredentials());
+ builder.append(" isFromKeytab=").append(ugi.isFromKeytab());
+ builder.append(" kerberos is enabled in Hadoop =").append(UserGroupInformation.isSecurityEnabled());
+ return builder.toString();
+ }
+
+ }
+
+ /**
+ * on-demand stringifier for a list of ACLs
+ */
+ public static class AclListInfo {
+ public final List<ACL> acls;
+
+ public AclListInfo(List<ACL> acls) {
+ this.acls = acls;
+ }
+
+ @Override
+ public String toString() {
+ return aclsToString(acls);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java
new file mode 100644
index 0000000..3c4a730
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZKPathDumper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.util.List;
+
+/**
+ * This class dumps a registry tree to a string.
+ * It does this in the <code>toString()</code> method, so it
+ * can be used in a log statement -the operation
+ * will only take place if the method is evaluated.
+ *
+ */
+@VisibleForTesting
+public class ZKPathDumper {
+
+ public static final int INDENT = 2;
+ private final CuratorFramework curator;
+ private final String root;
+ private final boolean verbose;
+
+ /**
+ * Create a path dumper -but do not dump the path until asked
+ * @param curator curator instance
+ * @param root root
+ * @param verbose verbose flag - includes more details (such as ACLs)
+ */
+ public ZKPathDumper(CuratorFramework curator,
+ String root,
+ boolean verbose) {
+ Preconditions.checkArgument(curator != null);
+ Preconditions.checkArgument(root != null);
+ this.curator = curator;
+ this.root = root;
+ this.verbose = verbose;
+ }
+
+ /**
+ * Trigger the recursive registry dump.
+ * @return a string view of the registry
+ */
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ZK tree for ").append(root).append('\n');
+ expand(builder, root, 1);
+ return builder.toString();
+ }
+
+ /**
+ * Recursively expand the path into the supplied string builder, increasing
+ * the indentation by {@link #INDENT} as it proceeds (depth first) down
+ * the tree
+ * @param builder string build to append to
+ * @param path path to examine
+ * @param indent current indentation
+ */
+ private void expand(StringBuilder builder,
+ String path,
+ int indent) {
+ try {
+ GetChildrenBuilder childrenBuilder = curator.getChildren();
+ List<String> children = childrenBuilder.forPath(path);
+ for (String child : children) {
+ String childPath = path + "/" + child;
+ String body;
+ Stat stat = curator.checkExists().forPath(childPath);
+ StringBuilder bodyBuilder = new StringBuilder(256);
+ bodyBuilder.append(" [")
+ .append(stat.getDataLength())
+ .append("]");
+ if (stat.getEphemeralOwner() > 0) {
+ bodyBuilder.append("*");
+ }
+ if (verbose) {
+ // verbose: extract ACLs
+ builder.append(" -- ");
+ List<ACL> acls =
+ curator.getACL().forPath(childPath);
+ for (ACL acl : acls) {
+ builder.append(RegistrySecurity.aclToString(acl));
+ builder.append(" ");
+ }
+ }
+ body = bodyBuilder.toString();
+ // print each child
+ append(builder, indent, ' ');
+ builder.append('/').append(child);
+ builder.append(body);
+ builder.append('\n');
+ // recurse
+ expand(builder, childPath, indent + INDENT);
+ }
+ } catch (Exception e) {
+ builder.append(e.toString()).append("\n");
+ }
+ }
+
+ /**
+ * Append the specified indentation to a builder
+ * @param builder string build to append to
+ * @param indent current indentation
+ * @param c charactor to use for indentation
+ */
+ private void append(StringBuilder builder, int indent, char c) {
+ for (int i = 0; i < indent; i++) {
+ builder.append(c);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java
new file mode 100644
index 0000000..711e27c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/ZookeeperConfigOptions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.server.ZooKeeperSaslServer;
+
+/**
+ * Configuration options which are internal to Zookeeper,
+ * as well as some other ZK constants
+ * <p>
+ * Zookeeper options are passed via system properties prior to the ZK
+ * Methods/classes being invoked. This implies that:
+ * <ol>
+ * <li>There can only be one instance of a ZK client or service class
+ * in a single JVM —else their configuration options will conflict.</li>
+ * <li>It is safest to set these properties immediately before
+ * invoking ZK operations.</li>
+ * </ol>
+ *
+ */
+public interface ZookeeperConfigOptions {
+ /**
+ * Enable SASL secure clients: {@value}.
+ * This is usually set to true, with ZK set to fall back to
+ * non-SASL authentication if the SASL auth fails
+ * by the property
+ * {@link #PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE}.
+ * <p>
+ * As a result, clients will default to attempting SASL-authentication,
+ * but revert to classic authentication/anonymous access on failure.
+ */
+ String PROP_ZK_ENABLE_SASL_CLIENT =
+ ZooKeeperSaslClient.ENABLE_CLIENT_SASL_KEY;
+
+ /**
+ * Default flag for the ZK client: {@value}.
+ */
+ String DEFAULT_ZK_ENABLE_SASL_CLIENT = "true";
+
+ /**
+ * System property for the JAAS client context : {@value}.
+ *
+ * For SASL authentication to work, this must point to a
+ * context within the
+ *
+ * <p>
+ * Default value is derived from
+ * {@link ZooKeeperSaslClient#LOGIN_CONTEXT_NAME_KEY}
+ */
+ String PROP_ZK_SASL_CLIENT_CONTEXT =
+ ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;
+
+ /**
+ * The SASL client username: {@value}.
+ * <p>
+ * Set this to the <i>short</i> name of the client, e.g, "user",
+ * not <code>user/host</code>, or <code>user/host@REALM</code>
+ */
+ String PROP_ZK_SASL_CLIENT_USERNAME = "zookeeper.sasl.client.username";
+
+ /**
+ * The SASL Server context, referring to a context in the JVM's
+ * JAAS context file: {@value}
+ * <p>
+ */
+ String PROP_ZK_SERVER_SASL_CONTEXT =
+ ZooKeeperSaslServer.LOGIN_CONTEXT_NAME_KEY;
+
+ /**
+ * Should ZK Server allow failed SASL clients to downgrade to classic
+ * authentication on a SASL auth failure: {@value}.
+ */
+ String PROP_ZK_SERVER_MAINTAIN_CONNECTION_DESPITE_SASL_FAILURE =
+ "zookeeper.maintain_connection_despite_sasl_failure";
+
+ /**
+ * should the ZK Server Allow failed SASL clients: {@value}.
+ */
+ String PROP_ZK_ALLOW_FAILED_SASL_CLIENTS =
+ "zookeeper.allowSaslFailedClients";
+
+ /**
+ * Kerberos realm of the server: {@value}.
+ */
+ String PROP_ZK_SERVER_REALM = "zookeeper.server.realm";
+
+ /**
+ * Path to a kinit binary: {@value}.
+ * Defaults to <code>"/usr/bin/kinit"</code>
+ */
+ String PROP_ZK_KINIT_PATH = "zookeeper.kinit";
+
+ /**
+ * ID scheme for SASL: {@value}.
+ */
+ String SCHEME_SASL = "sasl";
+
+ /**
+ * ID scheme for digest auth: {@value}.
+ */
+ String SCHEME_DIGEST = "digest";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java
new file mode 100644
index 0000000..f7ae983
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/package-info.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Core Zookeeper support.
+ * <p>
+ * This package contains the low-level bindings to Curator and Zookeeper,
+ * including everything related to registry security.
+ * <p>
+ * The class {@link org.apache.hadoop.registry.client.impl.zk.CuratorService}
+ * is a YARN service which offers access to a Zookeeper instance via
+ * Apache Curator.
+ * <p>
+ * The {@link org.apache.hadoop.registry.client.impl.zk.RegistrySecurity}
+ * implements the security support in the registry, though a set of
+ * static methods and as a YARN service.
+ * <p>
+ * To work with ZK, system properties need to be set before invoking
+ * some operations/instantiating some objects. The definitions of these
+ * are kept in {@link org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions}.
+ *
+ *
+ */
+package org.apache.hadoop.registry.client.impl.zk;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
new file mode 100644
index 0000000..192819c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/AddressTypes.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Enum of address types -as integers.
+ * Why integers and not enums? Cross platform serialization as JSON
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface AddressTypes {
+
+ /**
+ * hostname/FQDN and port pair: {@value}.
+ * The host/domain name and port are set as separate strings in the address
+ * list, e.g.
+ * <pre>
+ * ["namenode.example.org", "50070"]
+ * </pre>
+ */
+ public static final String ADDRESS_HOSTNAME_AND_PORT = "host/port";
+
+
+ /**
+ * Path <code>/a/b/c</code> style: {@value}.
+ * The entire path is encoded in a single entry
+ *
+ * <pre>
+ * ["/users/example/dataset"]
+ * </pre>
+ */
+ public static final String ADDRESS_PATH = "path";
+
+
+
+ /**
+ * URI entries: {@value}.
+ * <pre>
+ * ["http://example.org"]
+ * </pre>
+ */
+ public static final String ADDRESS_URI = "uri";
+
+ /**
+ * Zookeeper addresses as a triple : {@value}.
+ * <p>
+ * These are provide as a 3 element tuple of: hostname, port
+ * and optionally path (depending on the application)
+ * <p>
+ * A single element would be
+ * <pre>
+ * ["zk1","2181","/registry"]
+ * </pre>
+ * An endpoint with multiple elements would list them as
+ * <pre>
+ * [
+ * ["zk1","2181","/registry"]
+ * ["zk2","1600","/registry"]
+ * ]
+ * </pre>
+ *
+ * the third element in each entry , the path, MUST be the same in each entry.
+ * A client reading the addresses of an endpoint is free to pick any
+ * of the set, so they must be the same.
+ *
+ */
+ public static final String ADDRESS_ZOOKEEPER = "zktriple";
+
+ /**
+ * Any other address: {@value}.
+ */
+ public static final String ADDRESS_OTHER = "";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
new file mode 100644
index 0000000..51418d9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/Endpoint.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Description of a single service/component endpoint.
+ * It is designed to be marshalled as JSON.
+ * <p>
+ * Every endpoint can have more than one address entry, such as
+ * a list of URLs to a replicated service, or a (hostname, port)
+ * pair. Each of these address entries is represented as a string list,
+ * as that is the only reliably marshallable form of a tuple JSON can represent.
+ *
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public final class Endpoint implements Cloneable {
+
+ /**
+ * API implemented at the end of the binding
+ */
+ public String api;
+
+ /**
+ * Type of address. The standard types are defined in
+ * {@link AddressTypes}
+ */
+ public String addressType;
+
+ /**
+ * Protocol type. Some standard types are defined in
+ * {@link ProtocolTypes}
+ */
+ public String protocolType;
+
+ /**
+ * a list of address tuples —tuples whose format depends on the address type
+ */
+ public List<List<String>> addresses;
+
+ /**
+ * Create an empty instance.
+ */
+ public Endpoint() {
+ }
+
+ /**
+ * Create an endpoint from another endpoint.
+ * This is a deep clone with a new list of addresses.
+ * @param that the endpoint to copy from
+ */
+ public Endpoint(Endpoint that) {
+ this.api = that.api;
+ this.addressType = that.addressType;
+ this.protocolType = that.protocolType;
+ this.addresses = new ArrayList<List<String>>(that.addresses.size());
+ for (List<String> address : addresses) {
+ List<String> addr2 = new ArrayList<String>(address.size());
+ Collections.copy(address, addr2);
+ }
+ }
+
+ /**
+ * Build an endpoint with a list of addresses
+ * @param api API name
+ * @param addressType address type
+ * @param protocolType protocol type
+ * @param addrs addresses
+ */
+ public Endpoint(String api,
+ String addressType,
+ String protocolType,
+ List<List<String>> addrs) {
+ this.api = api;
+ this.addressType = addressType;
+ this.protocolType = protocolType;
+ this.addresses = new ArrayList<List<String>>();
+ if (addrs != null) {
+ addresses.addAll(addrs);
+ }
+ }
+
+ /**
+ * Build an endpoint from a list of URIs; each URI
+ * is ASCII-encoded and added to the list of addresses.
+ * @param api API name
+ * @param protocolType protocol type
+ * @param uris URIs to convert to a list of tup;les
+ */
+ public Endpoint(String api,
+ String protocolType,
+ URI... uris) {
+ this.api = api;
+ this.addressType = AddressTypes.ADDRESS_URI;
+
+ this.protocolType = protocolType;
+ List<List<String>> addrs = new ArrayList<List<String>>(uris.length);
+ for (URI uri : uris) {
+ addrs.add(RegistryTypeUtils.tuple(uri.toString()));
+ }
+ this.addresses = addrs;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("Endpoint{");
+ sb.append("api='").append(api).append('\'');
+ sb.append(", addressType='").append(addressType).append('\'');
+ sb.append(", protocolType='").append(protocolType).append('\'');
+
+ sb.append(", addresses=");
+ if (addresses != null) {
+ sb.append("[ ");
+ for (List<String> address : addresses) {
+ sb.append("[ ");
+ if (address == null) {
+ sb.append("NULL entry in address list");
+ } else {
+ for (String elt : address) {
+ sb.append('"').append(elt).append("\" ");
+ }
+ }
+ sb.append("] ");
+ };
+ sb.append("] ");
+ } else {
+ sb.append("(null) ");
+ }
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Validate the record by checking for null fields and other invalid
+ * conditions
+ * @throws NullPointerException if a field is null when it
+ * MUST be set.
+ * @throws RuntimeException on invalid entries
+ */
+ public void validate() {
+ Preconditions.checkNotNull(api, "null API field");
+ Preconditions.checkNotNull(addressType, "null addressType field");
+ Preconditions.checkNotNull(protocolType, "null protocolType field");
+ Preconditions.checkNotNull(addresses, "null addresses field");
+ for (List<String> address : addresses) {
+ Preconditions.checkNotNull(address, "null element in address");
+ }
+ }
+
+ /**
+ * Shallow clone: the lists of addresses are shared
+ * @return a cloned instance
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
new file mode 100644
index 0000000..f225cf0
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ProtocolTypes.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * some common protocol types
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ProtocolTypes {
+
+ /**
+ * Addresses are URIs of Hadoop Filesystem paths: {@value}.
+ */
+ String PROTOCOL_FILESYSTEM = "hadoop/filesystem";
+
+ /**
+ * Classic Hadoop IPC : {@value}.
+ */
+ String PROTOCOL_HADOOP_IPC = "hadoop/IPC";
+
+ /**
+ * Hadoop protocol buffers IPC: {@value}.
+ */
+ String PROTOCOL_HADOOP_IPC_PROTOBUF = "hadoop/protobuf";
+
+ /**
+ * Corba IIOP: {@value}.
+ */
+ String PROTOCOL_IIOP = "IIOP";
+
+ /**
+ * REST: {@value}.
+ */
+ String PROTOCOL_REST = "REST";
+
+ /**
+ * Java RMI: {@value}.
+ */
+ String PROTOCOL_RMI = "RMI";
+
+ /**
+ * SunOS RPC, as used by NFS and similar: {@value}.
+ */
+ String PROTOCOL_SUN_RPC = "sunrpc";
+
+ /**
+ * Thrift-based protocols: {@value}.
+ */
+ String PROTOCOL_THRIFT = "thrift";
+
+ /**
+ * Custom TCP protocol: {@value}.
+ */
+ String PROTOCOL_TCP = "tcp";
+
+ /**
+ * Custom UPC-based protocol : {@value}.
+ */
+ String PROTOCOL_UDP = "udp";
+
+ /**
+ * Default value —the protocol is unknown : "{@value}"
+ */
+ String PROTOCOL_UNKNOWN = "";
+
+ /**
+ * Web page: {@value}.
+ *
+ * This protocol implies that the URLs are designed for
+ * people to view via web browsers.
+ */
+ String PROTOCOL_WEBUI = "webui";
+
+ /**
+ * Web Services: {@value}.
+ */
+ String PROTOCOL_WSAPI = "WS-*";
+
+ /**
+ * A zookeeper binding: {@value}.
+ */
+ String PROTOCOL_ZOOKEEPER_BINDING = "zookeeper";
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java
new file mode 100644
index 0000000..59bcadc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/RegistryPathStatus.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+/**
+ * Output of a <code>RegistryOperations.stat()</code> call
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@JsonIgnoreProperties(ignoreUnknown = true)
+public final class RegistryPathStatus {
+
+ /**
+ * Short path in the registry to this entry
+ */
+ public final String path;
+
+ /**
+ * Timestamp
+ */
+ public final long time;
+
+ /**
+ * Entry size in bytes, as returned by the storage infrastructure.
+ * In zookeeper, even "empty" nodes have a non-zero size.
+ */
+ public final long size;
+
+ /**
+ * Number of child nodes
+ */
+ public final int children;
+
+ /**
+ * Construct an instance
+ * @param path full path
+ * @param time time
+ * @param size entry size
+ * @param children number of children
+ */
+ public RegistryPathStatus(
+ @JsonProperty("path") String path,
+ @JsonProperty("time") long time,
+ @JsonProperty("size") long size,
+ @JsonProperty("children") int children) {
+ this.path = path;
+ this.time = time;
+ this.size = size;
+ this.children = children;
+ }
+
+ /**
+ * Equality operator checks size, time and path of the entries.
+ * It does <i>not</i> check {@link #children}.
+ * @param other the other entry
+ * @return true if the entries are considered equal.
+ */
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+
+ RegistryPathStatus status = (RegistryPathStatus) other;
+
+ if (size != status.size) {
+ return false;
+ }
+ if (time != status.time) {
+ return false;
+ }
+ if (path != null ? !path.equals(status.path) : status.path != null) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * The hash code is derived from the path.
+ * @return hash code for storing the path in maps.
+ */
+ @Override
+ public int hashCode() {
+ return path != null ? path.hashCode() : 0;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("RegistryPathStatus{");
+ sb.append("path='").append(path).append('\'');
+ sb.append(", time=").append(time);
+ sb.append(", size=").append(size);
+ sb.append(", children=").append(children);
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
new file mode 100644
index 0000000..378127f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecord.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.annotate.JsonAnyGetter;
+import org.codehaus.jackson.annotate.JsonAnySetter;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * JSON-marshallable description of a single component.
+ * It supports the deserialization of unknown attributes, but does
+ * not support their creation.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class ServiceRecord implements Cloneable {
+
+ /**
+ * Description string
+ */
+ public String description;
+
+ /**
+ * map to handle unknown attributes.
+ */
+ private Map<String, String> attributes = new HashMap<String, String>(4);
+
+ /**
+ * List of endpoints intended for use to external callers
+ */
+ public List<Endpoint> external = new ArrayList<Endpoint>();
+
+ /**
+ * List of endpoints for use <i>within</i> an application.
+ */
+ public List<Endpoint> internal = new ArrayList<Endpoint>();
+
+ /**
+ * Create a service record with no ID, description or registration time.
+ * Endpoint lists are set to empty lists.
+ */
+ public ServiceRecord() {
+ }
+
+ /**
+ * Deep cloning constructor
+ * @param that service record source
+ */
+ public ServiceRecord(ServiceRecord that) {
+ this.description = that.description;
+ // others
+ Map<String, String> thatAttrs = that.attributes;
+ for (Map.Entry<String, String> entry : thatAttrs.entrySet()) {
+ attributes.put(entry.getKey(), entry.getValue());
+ }
+ // endpoints
+ List<Endpoint> src = that.internal;
+ if (src != null) {
+ internal = new ArrayList<Endpoint>(src.size());
+ for (Endpoint endpoint : src) {
+ internal.add(new Endpoint(endpoint));
+ }
+ }
+ src = that.external;
+ if (src != null) {
+ external = new ArrayList<Endpoint>(src.size());
+ for (Endpoint endpoint : src) {
+ external.add(new Endpoint(endpoint));
+ }
+ }
+ }
+
+ /**
+ * Add an external endpoint
+ * @param endpoint endpoint to set
+ */
+ public void addExternalEndpoint(Endpoint endpoint) {
+ Preconditions.checkArgument(endpoint != null);
+ endpoint.validate();
+ external.add(endpoint);
+ }
+
+ /**
+ * Add an internal endpoint
+ * @param endpoint endpoint to set
+ */
+ public void addInternalEndpoint(Endpoint endpoint) {
+ Preconditions.checkArgument(endpoint != null);
+ endpoint.validate();
+ internal.add(endpoint);
+ }
+
+ /**
+ * Look up an internal endpoint
+ * @param api API
+ * @return the endpoint or null if there was no match
+ */
+ public Endpoint getInternalEndpoint(String api) {
+ return findByAPI(internal, api);
+ }
+
+ /**
+ * Look up an external endpoint
+ * @param api API
+ * @return the endpoint or null if there was no match
+ */
+ public Endpoint getExternalEndpoint(String api) {
+ return findByAPI(external, api);
+ }
+
+ /**
+ * Handle unknown attributes by storing them in the
+ * {@link #attributes} map
+ * @param key attribute name
+ * @param value attribute value.
+ */
+ @JsonAnySetter
+ public void set(String key, Object value) {
+ attributes.put(key, value.toString());
+ }
+
+ /**
+ * The map of "other" attributes set when parsing. These
+ * are not included in the JSON value of this record when it
+ * is generated.
+ * @return a map of any unknown attributes in the deserialized JSON.
+ */
+ @JsonAnyGetter
+ public Map<String, String> attributes() {
+ return attributes;
+ }
+
+ /**
+ * Get the "other" attribute with a specific key
+ * @param key key to look up
+ * @return the value or null
+ */
+ public String get(String key) {
+ return attributes.get(key);
+ }
+
+ /**
+ * Get the "other" attribute with a specific key.
+ * @param key key to look up
+ * @param defVal default value
+ * @return the value as a string,
+ * or <code>defval</code> if the value was not present
+ */
+ public String get(String key, String defVal) {
+ String val = attributes.get(key);
+ return val != null ? val: defVal;
+ }
+
+ /**
+ * Find an endpoint by its API
+ * @param list list
+ * @param api api name
+ * @return the endpoint or null if there was no match
+ */
+ private Endpoint findByAPI(List<Endpoint> list, String api) {
+ for (Endpoint endpoint : list) {
+ if (endpoint.api.equals(api)) {
+ return endpoint;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb =
+ new StringBuilder("ServiceRecord{");
+ sb.append("description='").append(description).append('\'');
+ sb.append("; external endpoints: {");
+ for (Endpoint endpoint : external) {
+ sb.append(endpoint).append("; ");
+ }
+ sb.append("}; internal endpoints: {");
+ for (Endpoint endpoint : internal) {
+ sb.append(endpoint != null ? endpoint.toString() : "NULL ENDPOINT");
+ sb.append("; ");
+ }
+ sb.append('}');
+
+ if (!attributes.isEmpty()) {
+ sb.append(", attributes: {");
+ for (Map.Entry<String, String> attr : attributes.entrySet()) {
+ sb.append("\"").append(attr.getKey()).append("\"=\"")
+ .append(attr.getValue()).append("\" ");
+ }
+ } else {
+
+ sb.append(", attributes: {");
+ }
+ sb.append('}');
+
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Shallow clone: all endpoints will be shared across instances
+ * @return a clone of the instance
+ * @throws CloneNotSupportedException
+ */
+ @Override
+ protected Object clone() throws CloneNotSupportedException {
+ return super.clone();
+ }
+
+ /**
+ * Validate the record by checking for null fields and other invalid
+ * conditions
+ * @throws NullPointerException if a field is null when it
+ * MUST be set.
+ * @throws RuntimeException on invalid entries
+ */
+ public void validate() {
+ for (Endpoint endpoint : external) {
+ Preconditions.checkNotNull("null endpoint", endpoint);
+ endpoint.validate();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
new file mode 100644
index 0000000..2f75dba
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/ServiceRecordHeader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Service record header; access to the byte array kept private
+ * to avoid findbugs warnings of mutability
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ServiceRecordHeader {
+ /**
+ * Header of a service record: "jsonservicerec"
+ * By making this over 12 bytes long, we can auto-determine which entries
+ * in a listing are too short to contain a record without getting their data
+ */
+ private static final byte[] RECORD_HEADER = {
+ 'j', 's', 'o', 'n',
+ 's', 'e', 'r', 'v', 'i', 'c', 'e',
+ 'r', 'e', 'c'
+ };
+
+ /**
+ * Get the length of the record header
+ * @return the header length
+ */
+ public static int getLength() {
+ return RECORD_HEADER.length;
+ }
+
+ /**
+ * Get a clone of the record header
+ * @return the new record header.
+ */
+ public static byte[] getData() {
+ byte[] h = new byte[RECORD_HEADER.length];
+ System.arraycopy(RECORD_HEADER, 0, h, 0, RECORD_HEADER.length);
+ return h;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java
new file mode 100644
index 0000000..1c926be
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/package-info.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains all the data types which can be saved to the registry
+ * and/or marshalled to and from JSON.
+ * <p>
+ * The core datatypes, {@link org.apache.hadoop.registry.client.types.ServiceRecord},
+ * and {@link org.apache.hadoop.registry.client.types.Endpoint} are
+ * what is used to describe services and their protocol endpoints in the registry.
+ * <p>
+ * Some adjacent interfaces exist to list attributes of the fields:
+ * <ul>
+ * <li>{@link org.apache.hadoop.registry.client.types.AddressTypes}</li>
+ * <li>{@link org.apache.hadoop.registry.client.types.yarn.PersistencePolicies}</li>
+ * <li>{@link org.apache.hadoop.registry.client.types.ProtocolTypes}</li>
+ * </ul>
+ *
+ * The {@link org.apache.hadoop.registry.client.types.RegistryPathStatus}
+ * class is not saved to the registry —it is the status of a registry
+ * entry that can be retrieved from the API call. It is still
+ * designed to be marshalled to and from JSON, as it can be served up
+ * from REST front ends to the registry.
+ *
+ */
+package org.apache.hadoop.registry.client.types;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java
new file mode 100644
index 0000000..e4c7272
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/PersistencePolicies.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types.yarn;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+
+/**
+ * Persistence policies for {@link ServiceRecord}
+ */
+
+public interface PersistencePolicies {
+
+ /**
+ * The record persists until removed manually: {@value}.
+ */
+ String PERMANENT = "permanent";
+
+ /**
+ * Remove when the YARN application defined in the id field
+ * terminates: {@value}.
+ */
+ String APPLICATION = "application";
+
+ /**
+ * Remove when the current YARN application attempt ID finishes: {@value}.
+ */
+ String APPLICATION_ATTEMPT = "application-attempt";
+
+ /**
+ * Remove when the YARN container in the ID field finishes: {@value}
+ */
+ String CONTAINER = "container";
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
new file mode 100644
index 0000000..7b78932
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/types/yarn/YarnRegistryAttributes.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.types.yarn;
+
+/**
+ * YARN specific attributes in the registry
+ */
+public class YarnRegistryAttributes {
+
+ /**
+ * ID. For containers: container ID. For application instances, application ID.
+ */
+ public static final String YARN_ID = "yarn:id";
+ public static final String YARN_PERSISTENCE = "yarn:persistence";
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
new file mode 100644
index 0000000..e11890f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/integration/RMRegistryOperationsService.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.server.integration;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies;
+import org.apache.hadoop.registry.server.services.DeleteCompletionCallback;
+import org.apache.hadoop.registry.server.services.RegistryAdminService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.Future;
+
+/**
+ * Handle RM events by updating the registry
+ * <p>
+ * These actions are all implemented as event handlers to operations
+ * which come from the RM.
+ * <p>
+ * This service is expected to be executed by a user with the permissions
+ * to manipulate the entire registry,
+ */
+@InterfaceAudience.LimitedPrivate("YARN")
+@InterfaceStability.Evolving
+public class RMRegistryOperationsService extends RegistryAdminService {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RMRegistryOperationsService.class);
+
+ private PurgePolicy purgeOnCompletionPolicy = PurgePolicy.PurgeAll;
+
+ public RMRegistryOperationsService(String name) {
+ this(name, null);
+ }
+
+ public RMRegistryOperationsService(String name,
+ RegistryBindingSource bindingSource) {
+ super(name, bindingSource);
+ }
+
+
+ /**
+ * Extend the parent service initialization by verifying that the
+ * service knows —in a secure cluster— the realm in which it is executing.
+ * It needs this to properly build up the user names and hence their
+ * access rights.
+ *
+ * @param conf configuration of the service
+ * @throws Exception
+ */
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+
+ verifyRealmValidity();
+ }
+
+ public PurgePolicy getPurgeOnCompletionPolicy() {
+ return purgeOnCompletionPolicy;
+ }
+
+ public void setPurgeOnCompletionPolicy(PurgePolicy purgeOnCompletionPolicy) {
+ this.purgeOnCompletionPolicy = purgeOnCompletionPolicy;
+ }
+
+ public void onApplicationAttemptRegistered(ApplicationAttemptId attemptId,
+ String host, int rpcport, String trackingurl) throws IOException {
+
+ }
+
+ public void onApplicationLaunched(ApplicationId id) throws IOException {
+
+ }
+
+ /**
+ * Actions to take as an AM registers itself with the RM.
+ * @param attemptId attempt ID
+ * @throws IOException problems
+ */
+ public void onApplicationMasterRegistered(ApplicationAttemptId attemptId) throws
+ IOException {
+ }
+
+ /**
+ * Actions to take when the AM container is completed
+ * @param containerId container ID
+ * @throws IOException problems
+ */
+ public void onAMContainerFinished(ContainerId containerId) throws
+ IOException {
+ LOG.info("AM Container {} finished, purging application attempt records",
+ containerId);
+
+ // remove all application attempt entries
+ purgeAppAttemptRecords(containerId.getApplicationAttemptId());
+
+ // also treat as a container finish to remove container
+ // level records for the AM container
+ onContainerFinished(containerId);
+ }
+
+ /**
+ * remove all application attempt entries
+ * @param attemptId attempt ID
+ */
+ protected void purgeAppAttemptRecords(ApplicationAttemptId attemptId) {
+ purgeRecordsAsync("/",
+ attemptId.toString(),
+ PersistencePolicies.APPLICATION_ATTEMPT);
+ }
+
+ /**
+ * Actions to take when an application attempt is completed
+ * @param attemptId application ID
+ * @throws IOException problems
+ */
+ public void onApplicationAttemptUnregistered(ApplicationAttemptId attemptId)
+ throws IOException {
+ LOG.info("Application attempt {} unregistered, purging app attempt records",
+ attemptId);
+ purgeAppAttemptRecords(attemptId);
+ }
+
+ /**
+ * Actions to take when an application is completed
+ * @param id application ID
+ * @throws IOException problems
+ */
+ public void onApplicationCompleted(ApplicationId id)
+ throws IOException {
+ LOG.info("Application {} completed, purging application-level records",
+ id);
+ purgeRecordsAsync("/",
+ id.toString(),
+ PersistencePolicies.APPLICATION);
+ }
+
+ public void onApplicationAttemptAdded(ApplicationAttemptId appAttemptId) {
+ }
+
+ /**
+ * This is the event where the user is known, so the user directory
+ * can be created
+ * @param applicationId application ID
+ * @param user username
+ * @throws IOException problems
+ */
+ public void onStateStoreEvent(ApplicationId applicationId, String user) throws
+ IOException {
+ initUserRegistryAsync(user);
+ }
+
+ /**
+ * Actions to take when the AM container is completed
+ * @param id container ID
+ * @throws IOException problems
+ */
+ public void onContainerFinished(ContainerId id) throws IOException {
+ LOG.info("Container {} finished, purging container-level records",
+ id);
+ purgeRecordsAsync("/",
+ id.toString(),
+ PersistencePolicies.CONTAINER);
+ }
+
+ /**
+ * Queue an async operation to purge all matching records under a base path.
+ * <ol>
+ * <li>Uses a depth first search</li>
+ * <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
+ * <li>If a record matches then it is deleted without any child searches</li>
+ * <li>Deletions will be asynchronous if a callback is provided</li>
+ * </ol>
+ * @param path base path
+ * @param id ID for service record.id
+ * @param persistencePolicyMatch ID for the persistence policy to match:
+ * no match, no delete.
+ * @return a future that returns the #of records deleted
+ */
+ @VisibleForTesting
+ public Future<Integer> purgeRecordsAsync(String path,
+ String id,
+ String persistencePolicyMatch) {
+
+ return purgeRecordsAsync(path,
+ id, persistencePolicyMatch,
+ purgeOnCompletionPolicy,
+ new DeleteCompletionCallback());
+ }
+
+ /**
+ * Queue an async operation to purge all matching records under a base path.
+ * <ol>
+ * <li>Uses a depth first search</li>
+ * <li>A match is on ID and persistence policy, or, if policy==-1, any match</li>
+ * <li>If a record matches then it is deleted without any child searches</li>
+ * <li>Deletions will be asynchronous if a callback is provided</li>
+ * </ol>
+ * @param path base path
+ * @param id ID for service record.id
+ * @param persistencePolicyMatch ID for the persistence policy to match:
+ * no match, no delete.
+ * @param purgePolicy how to react to children under the entry
+ * @param callback an optional callback
+ * @return a future that returns the #of records deleted
+ */
+ @VisibleForTesting
+ public Future<Integer> purgeRecordsAsync(String path,
+ String id,
+ String persistencePolicyMatch,
+ PurgePolicy purgePolicy,
+ BackgroundCallback callback) {
+ LOG.info(" records under {} with ID {} and policy {}: {}",
+ path, id, persistencePolicyMatch);
+ return submit(
+ new AsyncPurge(path,
+ new SelectByYarnPersistence(id, persistencePolicyMatch),
+ purgePolicy,
+ callback));
+ }
+
+}