You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2014/10/08 21:54:52 UTC

[5/6] YARN-913 service registry: YARN-2652 add hadoop-yarn-registry package under hadoop-yarn

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
new file mode 100644
index 0000000..b4254a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryTypeUtils.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.binding;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ProtocolTypes;
+
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Static methods to work with registry types —primarily endpoints and the
+ * list representation of addresses.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RegistryTypeUtils {
+
+  /**
+   * Create a URL endpoint from a list of URIs
+   * @param api implemented API
+   * @param protocolType protocol type
+   * @param uris URIs
+   * @return a new endpoint
+   */
+  public static Endpoint urlEndpoint(String api,
+      String protocolType,
+      URI... uris) {
+    return new Endpoint(api, protocolType, uris);
+  }
+
+  /**
+   * Create a REST endpoint from a list of URIs
+   * @param api implemented API
+   * @param uris URIs
+   * @return a new endpoint
+   */
+  public static Endpoint restEndpoint(String api,
+      URI... uris) {
+    return urlEndpoint(api, ProtocolTypes.PROTOCOL_REST, uris);
+  }
+
+  /**
+   * Create a Web UI endpoint from a list of URIs
+   * @param api implemented API
+   * @param uris URIs
+   * @return a new endpoint
+   */
+  public static Endpoint webEndpoint(String api,
+      URI... uris) {
+    return urlEndpoint(api, ProtocolTypes.PROTOCOL_WEBUI, uris);
+  }
+
+  /**
+   * Create an internet address endpoint from a list of URIs
+   * @param api implemented API
+   * @param protocolType protocol type
+   * @param hostname hostname/FQDN
+   * @param port port
+   * @return a new endpoint
+   */
+
+  public static Endpoint inetAddrEndpoint(String api,
+      String protocolType,
+      String hostname,
+      int port) {
+    Preconditions.checkArgument(api != null, "null API");
+    Preconditions.checkArgument(protocolType != null, "null protocolType");
+    Preconditions.checkArgument(hostname != null, "null hostname");
+    return new Endpoint(api,
+        AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
+        protocolType,
+        tuplelist(hostname, Integer.toString(port)));
+  }
+
+  /**
+   * Create an IPC endpoint
+   * @param api API
+   * @param protobuf flag to indicate whether or not the IPC uses protocol
+   * buffers
+   * @param address the address as a tuple of (hostname, port)
+   * @return the new endpoint
+   */
+  public static Endpoint ipcEndpoint(String api,
+      boolean protobuf, List<String> address) {
+    ArrayList<List<String>> addressList = new ArrayList<List<String>>();
+    if (address != null) {
+      addressList.add(address);
+    }
+    return new Endpoint(api,
+        AddressTypes.ADDRESS_HOSTNAME_AND_PORT,
+        protobuf ? ProtocolTypes.PROTOCOL_HADOOP_IPC_PROTOBUF
+                 : ProtocolTypes.PROTOCOL_HADOOP_IPC,
+        addressList);
+  }
+
+  /**
+   * Create a single-element list of tuples from the input.
+   * that is, an input ("a","b","c") is converted into a list
+   * in the form [["a","b","c"]]
+   * @param t1 tuple elements
+   * @return a list containing a single tuple
+   */
+  public static List<List<String>> tuplelist(String... t1) {
+    List<List<String>> outer = new ArrayList<List<String>>();
+    outer.add(tuple(t1));
+    return outer;
+  }
+
+  /**
+   * Create a tuples from the input.
+   * that is, an input ("a","b","c") is converted into a list
+   * in the form ["a","b","c"]
+   * @param t1 tuple elements
+   * @return a single tuple as a list
+   */
+  public static List<String> tuple(String... t1) {
+    return Arrays.asList(t1);
+  }
+
+  /**
+   * Create a tuples from the input, converting all to Strings in the process
+   * that is, an input ("a", 7, true) is converted into a list
+   * in the form ["a","7,"true"]
+   * @param t1 tuple elements
+   * @return a single tuple as a list
+   */
+  public static List<String> tuple(Object... t1) {
+    List<String> l = new ArrayList<String>(t1.length);
+    for (Object t : t1) {
+      l.add(t.toString());
+    }
+    return l;
+  }
+
+  /**
+   * Convert a socket address pair into a string tuple, (host, port).
+   * TODO JDK7: move to InetAddress.getHostString() to avoid DNS lookups.
+   * @param address an address
+   * @return an element for the address list
+   */
+  public static List<String> marshall(InetSocketAddress address) {
+    return tuple(address.getHostName(), address.getPort());
+  }
+
+  /**
+   * Require a specific address type on an endpoint
+   * @param required required type
+   * @param epr endpoint
+   * @throws InvalidRecordException if the type is wrong
+   */
+  public static void requireAddressType(String required, Endpoint epr) throws
+      InvalidRecordException {
+    if (!required.equals(epr.addressType)) {
+      throw new InvalidRecordException(
+          epr.toString(),
+          "Address type of " + epr.addressType
+          + " does not match required type of "
+          + required);
+    }
+  }
+
+  /**
+   * Get a single URI endpoint
+   * @param epr endpoint
+   * @return the uri of the first entry in the address list. Null if the endpoint
+   * itself is null
+   * @throws InvalidRecordException if the type is wrong, there are no addresses
+   * or the payload ill-formatted
+   */
+  public static List<String> retrieveAddressesUriType(Endpoint epr)
+      throws InvalidRecordException {
+    if (epr == null) {
+      return null;
+    }
+    requireAddressType(AddressTypes.ADDRESS_URI, epr);
+    List<List<String>> addresses = epr.addresses;
+    if (addresses.size() < 1) {
+      throw new InvalidRecordException(epr.toString(),
+          "No addresses in endpoint");
+    }
+    List<String> results = new ArrayList<String>(addresses.size());
+    for (List<String> address : addresses) {
+      if (address.size() != 1) {
+        throw new InvalidRecordException(epr.toString(),
+            "Address payload invalid: wrong element count: " +
+            address.size());
+      }
+      results.add(address.get(0));
+    }
+    return results;
+  }
+
+  /**
+   * Get the address URLs. Guranteed to return at least one address.
+   * @param epr endpoint
+   * @return the address as a URL
+   * @throws InvalidRecordException if the type is wrong, there are no addresses
+   * or the payload ill-formatted
+   * @throws MalformedURLException address can't be turned into a URL
+   */
+  public static List<URL> retrieveAddressURLs(Endpoint epr)
+      throws InvalidRecordException, MalformedURLException {
+    if (epr == null) {
+      throw new InvalidRecordException("", "Null endpoint");
+    }
+    List<String> addresses = retrieveAddressesUriType(epr);
+    List<URL> results = new ArrayList<URL>(addresses.size());
+    for (String address : addresses) {
+      results.add(new URL(address));
+    }
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
new file mode 100644
index 0000000..3b28a02
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/RegistryUtils.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.binding;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.exceptions.NoRecordException;
+import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility methods for working with a registry.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RegistryUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistryUtils.class);
+
+  /**
+   * Buld the user path -switches to the system path if the user is "".
+   * It also cross-converts the username to ascii via punycode
+   * @param shortname username or ""
+   * @return the path to the user
+   */
+  public static String homePathForUser(String shortname) {
+    Preconditions.checkArgument(shortname != null, "null user");
+
+    // catch recursion
+    if (shortname.startsWith(RegistryConstants.PATH_USERS)) {
+      return shortname;
+    }
+    if (shortname.isEmpty()) {
+      return RegistryConstants.PATH_SYSTEM_SERVICES;
+    }
+    return RegistryPathUtils.join(RegistryConstants.PATH_USERS,
+        encodeForRegistry(shortname));
+  }
+
+  /**
+   * Create a service classpath
+   * @param user username or ""
+   * @param serviceClass service name
+   * @return a full path
+   */
+  public static String serviceclassPath(String user,
+      String serviceClass) {
+    String services = join(homePathForUser(user),
+        RegistryConstants.PATH_USER_SERVICES);
+    return join(services,
+        serviceClass);
+  }
+
+  /**
+   * Create a path to a service under a user & service class
+   * @param user username or ""
+   * @param serviceClass service name
+   * @param serviceName service name unique for that user & service class
+   * @return a full path
+   */
+  public static String servicePath(String user,
+      String serviceClass,
+      String serviceName) {
+
+    return join(
+        serviceclassPath(user, serviceClass),
+        serviceName);
+  }
+
+  /**
+   * Create a path for listing components under a service
+   * @param user username or ""
+   * @param serviceClass service name
+   * @param serviceName service name unique for that user & service class
+   * @return a full path
+   */
+  public static String componentListPath(String user,
+      String serviceClass, String serviceName) {
+
+    return join(servicePath(user, serviceClass, serviceName),
+        RegistryConstants.SUBPATH_COMPONENTS);
+  }
+
+  /**
+   * Create the path to a service record for a component
+   * @param user username or ""
+   * @param serviceClass service name
+   * @param serviceName service name unique for that user & service class
+   * @param componentName unique name/ID of the component
+   * @return a full path
+   */
+  public static String componentPath(String user,
+      String serviceClass, String serviceName, String componentName) {
+
+    return join(
+        componentListPath(user, serviceClass, serviceName),
+        componentName);
+  }
+
+  /**
+   * List service records directly under a path
+   * @param registryOperations registry operations instance
+   * @param path path to list
+   * @return a mapping of the service records that were resolved, indexed
+   * by their full path
+   * @throws IOException
+   */
+  public static Map<String, ServiceRecord> listServiceRecords(
+      RegistryOperations registryOperations,
+      String path) throws IOException {
+    Map<String, RegistryPathStatus> children =
+        statChildren(registryOperations, path);
+    return extractServiceRecords(registryOperations,
+        path,
+        children.values());
+  }
+
+  /**
+   * List children of a directory and retrieve their
+   * {@link RegistryPathStatus} values.
+   * <p>
+   * This is not an atomic operation; A child may be deleted
+   * during the iteration through the child entries. If this happens,
+   * the <code>PathNotFoundException</code> is caught and that child
+   * entry ommitted.
+   *
+   * @param path path
+   * @return a possibly empty map of child entries listed by
+   * their short name.
+   * @throws PathNotFoundException path is not in the registry.
+   * @throws InvalidPathnameException the path is invalid.
+   * @throws IOException Any other IO Exception
+   */
+  public static Map<String, RegistryPathStatus> statChildren(
+      RegistryOperations registryOperations,
+      String path)
+      throws PathNotFoundException,
+      InvalidPathnameException,
+      IOException {
+    List<String> childNames = registryOperations.list(path);
+    Map<String, RegistryPathStatus> results =
+        new HashMap<String, RegistryPathStatus>();
+    for (String childName : childNames) {
+      String child = join(path, childName);
+      try {
+        RegistryPathStatus stat = registryOperations.stat(child);
+        results.put(childName, stat);
+      } catch (PathNotFoundException pnfe) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("stat failed on {}: moved? {}", child, pnfe, pnfe);
+        }
+        // and continue
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Get the home path of the current user.
+   * <p>
+   *  In an insecure cluster, the environment variable
+   *  <code>HADOOP_USER_NAME</code> is queried <i>first</i>.
+   * <p>
+   * This means that in a YARN container where the creator set this
+   * environment variable to propagate their identity, the defined
+   * user name is used in preference to the actual user.
+   * <p>
+   * In a secure cluster, the kerberos identity of the current user is used.
+   * @return a path for the current user's home dir.
+   * @throws RuntimeException if the current user identity cannot be determined
+   * from the OS/kerberos.
+   */
+  public static String homePathForCurrentUser() {
+    String shortUserName = currentUsernameUnencoded();
+    return homePathForUser(shortUserName);
+  }
+
+  /**
+   * Get the current username, before any encoding has been applied.
+   * @return the current user from the kerberos identity, falling back
+   * to the user and/or env variables.
+   */
+  private static String currentUsernameUnencoded() {
+    String env_hadoop_username = System.getenv(
+        RegistryInternalConstants.HADOOP_USER_NAME);
+    return getCurrentUsernameUnencoded(env_hadoop_username);
+  }
+
+  /**
+   * Get the current username, using the value of the parameter
+   * <code>env_hadoop_username</code> if it is set on an insecure cluster.
+   * This ensures that the username propagates correctly across processes
+   * started by YARN.
+   * <p>
+   * This method is primarly made visible for testing.
+   * @param env_hadoop_username the environment variable
+   * @return the selected username
+   * @throws RuntimeException if there is a problem getting the short user
+   * name of the current user.
+   */
+  @VisibleForTesting
+  public static String getCurrentUsernameUnencoded(String env_hadoop_username) {
+    String shortUserName = null;
+    if (!UserGroupInformation.isSecurityEnabled()) {
+      shortUserName = env_hadoop_username;
+    }
+    if (StringUtils.isEmpty(shortUserName)) {
+      try {
+        shortUserName = UserGroupInformation.getCurrentUser().getShortUserName();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return shortUserName;
+  }
+
+  /**
+   * Get the current user path formatted for the registry
+   * <p>
+   *  In an insecure cluster, the environment variable
+   *  <code>HADOOP_USER_NAME </code> is queried <i>first</i>.
+   * <p>
+   * This means that in a YARN container where the creator set this
+   * environment variable to propagate their identity, the defined
+   * user name is used in preference to the actual user.
+   * <p>
+   * In a secure cluster, the kerberos identity of the current user is used.
+   * @return the encoded shortname of the current user
+   * @throws RuntimeException if the current user identity cannot be determined
+   * from the OS/kerberos.
+   *
+   */
+  public static String currentUser() {
+    String shortUserName = currentUsernameUnencoded();
+    return encodeForRegistry(shortUserName);
+  }
+
+  /**
+   * Extract all service records under a list of stat operations...this
+   * skips entries that are too short or simply not matching
+   * @param operations operation support for fetches
+   * @param parentpath path of the parent of all the entries
+   * @param stats Collection of stat results
+   * @return a possibly empty map of fullpath:record.
+   * @throws IOException for any IO Operation that wasn't ignored.
+   */
+  public static Map<String, ServiceRecord> extractServiceRecords(
+      RegistryOperations operations,
+      String parentpath,
+      Collection<RegistryPathStatus> stats) throws IOException {
+    Map<String, ServiceRecord> results = new HashMap<String, ServiceRecord>(stats.size());
+    for (RegistryPathStatus stat : stats) {
+      if (stat.size > ServiceRecordHeader.getLength()) {
+        // maybe has data
+        String path = join(parentpath, stat.path);
+        try {
+          ServiceRecord serviceRecord = operations.resolve(path);
+          results.put(path, serviceRecord);
+        } catch (EOFException ignored) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("data too short for {}", path);
+          }
+        } catch (InvalidRecordException record) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Invalid record at {}", path);
+          }
+        } catch (NoRecordException record) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No record at {}", path);
+          }
+        }
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Extract all service records under a list of stat operations...this
+   * non-atomic action skips entries that are too short or simply not matching.
+   * <p>
+   * @param operations operation support for fetches
+   * @param parentpath path of the parent of all the entries
+   * @param stats a map of name:value mappings.
+   * @return a possibly empty map of fullpath:record.
+   * @throws IOException for any IO Operation that wasn't ignored.
+   */
+  public static Map<String, ServiceRecord> extractServiceRecords(
+      RegistryOperations operations,
+      String parentpath,
+      Map<String , RegistryPathStatus> stats) throws IOException {
+    return extractServiceRecords(operations, parentpath, stats.values());
+  }
+
+
+  /**
+   * Extract all service records under a list of stat operations...this
+   * non-atomic action skips entries that are too short or simply not matching.
+   * <p>
+   * @param operations operation support for fetches
+   * @param parentpath path of the parent of all the entries
+   * @param stats a map of name:value mappings.
+   * @return a possibly empty map of fullpath:record.
+   * @throws IOException for any IO Operation that wasn't ignored.
+   */
+  public static Map<String, ServiceRecord> extractServiceRecords(
+      RegistryOperations operations,
+      String parentpath) throws IOException {
+    return
+    extractServiceRecords(operations,
+        parentpath,
+        statChildren(operations, parentpath).values());
+  }
+
+
+
+  /**
+   * Static instance of service record marshalling
+   */
+  public static class ServiceRecordMarshal extends JsonSerDeser<ServiceRecord> {
+    public ServiceRecordMarshal() {
+      super(ServiceRecord.class, ServiceRecordHeader.getData());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java
new file mode 100644
index 0000000..f99aa71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/binding/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Registry binding utility classes.
+ */
+package org.apache.hadoop.registry.client.binding;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java
new file mode 100644
index 0000000..aadb7fc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/AuthenticationFailedException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+/**
+ * Exception raised when client access wasn't authenticated.
+ * That is: the credentials provided were incomplete or invalid.
+ */
+public class AuthenticationFailedException extends RegistryIOException {
+  public AuthenticationFailedException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public AuthenticationFailedException(String path, String error) {
+    super(path, error);
+  }
+
+  public AuthenticationFailedException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java
new file mode 100644
index 0000000..c984f41
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidPathnameException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A path name was invalid. This is raised when a path string has
+ * characters in it that are not permitted.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class InvalidPathnameException extends RegistryIOException {
+  public InvalidPathnameException(String path, String message) {
+    super(path, message);
+  }
+
+  public InvalidPathnameException(String path,
+      String message,
+      Throwable cause) {
+    super(path, message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java
new file mode 100644
index 0000000..e4f545e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/InvalidRecordException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Raised if an attempt to parse a record failed.
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class InvalidRecordException extends RegistryIOException {
+
+  public InvalidRecordException(String path, String error) {
+    super(path, error);
+  }
+
+  public InvalidRecordException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java
new file mode 100644
index 0000000..24070a5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoChildrenForEphemeralsException.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is a manifestation of the Zookeeper restrictions about
+ * what nodes may act as parents.
+ *
+ * Children are not allowed under ephemeral nodes. This is an aspect
+ * of ZK which isn't directly exposed to the registry API. It may
+ * surface if the registry is manipulated outside of the registry API.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class NoChildrenForEphemeralsException extends RegistryIOException {
+  public NoChildrenForEphemeralsException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public NoChildrenForEphemeralsException(String path, String error) {
+    super(path, error);
+  }
+
+  public NoChildrenForEphemeralsException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java
new file mode 100644
index 0000000..ce84f5b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoPathPermissionsException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Raised on path permission exceptions.
+ * <p>
+ * This is similar to PathIOException, except that exception doesn't let
+ */
+public class NoPathPermissionsException extends RegistryIOException {
+  public NoPathPermissionsException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public NoPathPermissionsException(String path, String error) {
+    super(path, error);
+  }
+
+  public NoPathPermissionsException(String path, String error, Throwable cause) {
+    super(path, error, cause);
+  }
+
+  public NoPathPermissionsException(String message,
+      PathIOException cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
new file mode 100644
index 0000000..160433f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/NoRecordException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.registry.client.types.ServiceRecordHeader;
+
+/**
+ * Raised if there is no {@link ServiceRecord} resolved at the end
+ * of the specified path, for reasons such as:
+ * <ul>
+ *   <li>There wasn't enough data to contain a Service Record.</li>
+ *   <li>The start of the data did not match the {@link ServiceRecordHeader}
+ *   header.</li>
+ * </ul>
+ *
+ * There may be valid data of some form at the end of the path, but it does
+ * not appear to be a Service Record.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class NoRecordException extends RegistryIOException {
+
+  public NoRecordException(String path, String error) {
+    super(path, error);
+  }
+
+  public NoRecordException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java
new file mode 100644
index 0000000..ca966db
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/RegistryIOException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Base exception for registry operations.
+ * <p>
+ * These exceptions include the path of the failing operation wherever possible;
+ * this can be retrieved via {@link PathIOException#getPath()}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RegistryIOException extends PathIOException {
+
+  /**
+   * Build an exception from any other Path IO Exception.
+   * This propagates the path of the original exception
+   * @param message more specific text
+   * @param cause cause
+   */
+  public RegistryIOException(String message, PathIOException cause) {
+    super(cause.getPath() != null ? cause.getPath().toString() : "",
+        message,
+        cause);
+  }
+
+  public RegistryIOException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public RegistryIOException(String path, String error) {
+    super(path, error);
+  }
+
+  public RegistryIOException(String path, String error, Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java
new file mode 100644
index 0000000..7d9c8ad
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/exceptions/package-info.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Registry Service Exceptions
+ * <p>
+ * These are the Registry-specific exceptions that may be raised during
+ * Registry operations.
+ * <p>
+ * Other exceptions may be raised, especially <code>IOExceptions</code>
+ * triggered by network problems, and <code>IllegalArgumentException</code>
+ * exceptions that may be raised if invalid (often null) arguments are passed
+ * to a method call.
+ * <p>
+ *   All exceptions in this package are derived from
+ *   {@link org.apache.hadoop.registry.client.exceptions.RegistryIOException}
+ */
+package org.apache.hadoop.registry.client.exceptions;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java
new file mode 100644
index 0000000..db03936
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/RegistryOperationsClient.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource;
+import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
+
+
+/**
+ * This is the client service for applications to work with the registry.
+ *
+ * It does not set up the root paths for the registry, is bonded
+ * to a user, and can be set to use SASL, anonymous or id:pass auth.
+ *
+ * For SASL, the client must be operating in the context of an authed user.
+ *
+ * For id:pass the client must have the relevant id & password, SASL is
+ * not used even if the client has credentials.
+ *
+ * For anonymous, nothing is used.
+ *
+ * Any SASL-authed client also has the ability to add one or more authentication
+ * id:pass pair on all future writes, and to reset them later.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RegistryOperationsClient extends RegistryOperationsService {
+
+  public RegistryOperationsClient(String name) {
+    super(name);
+  }
+
+  public RegistryOperationsClient(String name,
+      RegistryBindingSource bindingSource) {
+    super(name, bindingSource);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java
new file mode 100644
index 0000000..d85b6a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Registry client services
+ * <p>
+ * These are classes which follow the YARN lifecycle and which implement
+ * the {@link org.apache.hadoop.registry.client.api.RegistryOperations}
+ * API.
+ */
+package org.apache.hadoop.registry.client.impl;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java
new file mode 100644
index 0000000..8ae003d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/BindingInformation.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Binding information provided by a {@link RegistryBindingSource}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class BindingInformation {
+
+  /**
+   * The Curator Ensemble Provider
+   */
+  public EnsembleProvider ensembleProvider;
+
+  /**
+   * Any information that may be useful for diagnostics
+   */
+  public String description;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
new file mode 100644
index 0000000..a0e6365
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java
@@ -0,0 +1,769 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.curator.ensemble.EnsembleProvider;
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CreateBuilder;
+import org.apache.curator.framework.api.DeleteBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.retry.BoundedExponentialBackoffRetry;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.AuthenticationFailedException;
+import org.apache.hadoop.registry.client.exceptions.NoChildrenForEphemeralsException;
+import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException;
+import org.apache.hadoop.registry.client.exceptions.RegistryIOException;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This service binds to Zookeeper via Apache Curator. It is more
+ * generic than just the YARN service registry; it does not implement
+ * any of the Registry Operations API.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class CuratorService extends CompositeService
+    implements RegistryConstants, RegistryBindingSource {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CuratorService.class);
+
+  /**
+   * the Curator binding
+   */
+  private CuratorFramework curator;
+
+  /**
+   * Path to the registry root
+   */
+  private String registryRoot;
+
+  /**
+   * Supplied binding source. This defaults to being this
+   * service itself.
+   */
+  private final RegistryBindingSource bindingSource;
+
+  /**
+   * Security service
+   */
+  private RegistrySecurity registrySecurity;
+
+  /**
+   * the connection binding text for messages
+   */
+  private String connectionDescription;
+
+  /**
+   * Security connection diagnostics
+   */
+  private String securityConnectionDiagnostics = "";
+
+  /**
+   * Provider of curator "ensemble"; offers a basis for
+   * more flexible bonding in future.
+   */
+  private EnsembleProvider ensembleProvider;
+
+  /**
+   * Construct the service.
+   * @param name service name
+   * @param bindingSource source of binding information.
+   * If null: use this instance
+   */
+  public CuratorService(String name, RegistryBindingSource bindingSource) {
+    super(name);
+    if (bindingSource != null) {
+      this.bindingSource = bindingSource;
+    } else {
+      this.bindingSource = this;
+    }
+  }
+
+  /**
+   * Create an instance using this service as the binding source (i.e. read
+   * configuration options from the registry)
+   * @param name service name
+   */
+  public CuratorService(String name) {
+    this(name, null);
+  }
+
+  /**
+   * Init the service.
+   * This is where the security bindings are set up
+   * @param conf configuration of the service
+   * @throws Exception
+   */
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+
+    registryRoot = conf.getTrimmed(KEY_REGISTRY_ZK_ROOT,
+        DEFAULT_ZK_REGISTRY_ROOT);
+
+    // create and add the registy service
+    registrySecurity = new RegistrySecurity("registry security");
+    addService(registrySecurity);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating Registry with root {}", registryRoot);
+    }
+
+    super.serviceInit(conf);
+  }
+
+  /**
+   * Start the service.
+   * This is where the curator instance is started.
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+
+    // create the curator; rely on the registry security code
+    // to set up the JVM context and curator
+    curator = createCurator();
+  }
+
+  /**
+   * Close the ZK connection if it is open
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    IOUtils.closeStream(curator);
+    super.serviceStop();
+  }
+
+  /**
+   * Internal check that a service is in the live state
+   * @throws ServiceStateException if not
+   */
+  private void checkServiceLive() throws ServiceStateException {
+    if (!isInState(STATE.STARTED)) {
+      throw new ServiceStateException(
+          "Service " + getName() + " is in wrong state: "
+          + getServiceState());
+    }
+  }
+
+  /**
+   * Flag to indicate whether or not the registry is secure.
+   * Valid once the service is inited.
+   * @return service security policy
+   */
+  public boolean isSecure() {
+    return registrySecurity.isSecureRegistry();
+  }
+
+  /**
+   * Get the registry security helper
+   * @return the registry security helper
+   */
+  protected RegistrySecurity getRegistrySecurity() {
+    return registrySecurity;
+  }
+
+  /**
+   * Build the security diagnostics string
+   * @return a string for diagnostics
+   */
+  protected String buildSecurityDiagnostics() {
+    // build up the security connection diags
+    if (!isSecure()) {
+      return "security disabled";
+    } else {
+      StringBuilder builder = new StringBuilder();
+      builder.append("secure cluster; ");
+      builder.append(registrySecurity.buildSecurityDiagnostics());
+      return builder.toString();
+    }
+  }
+
+  /**
+   * Create a new curator instance off the root path; using configuration
+   * options provided in the service configuration to set timeouts and
+   * retry policy.
+   * @return the newly created creator
+   */
+  private CuratorFramework createCurator() throws IOException {
+    Configuration conf = getConfig();
+    createEnsembleProvider();
+    int sessionTimeout = conf.getInt(KEY_REGISTRY_ZK_SESSION_TIMEOUT,
+        DEFAULT_ZK_SESSION_TIMEOUT);
+    int connectionTimeout = conf.getInt(KEY_REGISTRY_ZK_CONNECTION_TIMEOUT,
+        DEFAULT_ZK_CONNECTION_TIMEOUT);
+    int retryTimes = conf.getInt(KEY_REGISTRY_ZK_RETRY_TIMES,
+        DEFAULT_ZK_RETRY_TIMES);
+    int retryInterval = conf.getInt(KEY_REGISTRY_ZK_RETRY_INTERVAL,
+        DEFAULT_ZK_RETRY_INTERVAL);
+    int retryCeiling = conf.getInt(KEY_REGISTRY_ZK_RETRY_CEILING,
+        DEFAULT_ZK_RETRY_CEILING);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating CuratorService with connection {}",
+          connectionDescription);
+    }
+    CuratorFramework framework;
+
+    synchronized (CuratorService.class) {
+      // set the security options
+
+      //log them
+      securityConnectionDiagnostics = buildSecurityDiagnostics();
+
+      // build up the curator itself
+      CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
+      builder.ensembleProvider(ensembleProvider)
+       .connectionTimeoutMs(connectionTimeout)
+       .sessionTimeoutMs(sessionTimeout)
+
+       .retryPolicy(new BoundedExponentialBackoffRetry(retryInterval,
+           retryCeiling,
+           retryTimes));
+
+      // set up the builder AND any JVM context
+      registrySecurity.applySecurityEnvironment(builder);
+
+      framework = builder.build();
+      framework.start();
+    }
+
+    return framework;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString()
+           + bindingDiagnosticDetails();
+  }
+
+  /**
+   * Get the binding diagnostics
+   * @return a diagnostics string valid after the service is started.
+   */
+  public String bindingDiagnosticDetails() {
+    return " Connection=\"" + connectionDescription + "\""
+           + " root=\"" + registryRoot + "\""
+           + " " + securityConnectionDiagnostics;
+  }
+
+  /**
+   * Create a full path from the registry root and the supplied subdir
+   * @param path path of operation
+   * @return an absolute path
+   * @throws IllegalArgumentException if the path is invalide
+   */
+  protected String createFullPath(String path) throws IOException {
+    return RegistryPathUtils.createFullPath(registryRoot, path);
+  }
+
+  /**
+   * Get the registry binding source ... this can be used to
+   * create new ensemble providers
+   * @return the registry binding source in use
+   */
+  public RegistryBindingSource getBindingSource() {
+    return bindingSource;
+  }
+
+  /**
+   * Create the ensemble provider for this registry, by invoking
+   * {@link RegistryBindingSource#supplyBindingInformation()} on
+   * the provider stored in {@link #bindingSource}
+   * Sets {@link #ensembleProvider} to that value;
+   * sets {@link #connectionDescription} to the binding info
+   * for use in toString and logging;
+   *
+   */
+  protected void createEnsembleProvider() {
+    BindingInformation binding = bindingSource.supplyBindingInformation();
+    connectionDescription = binding.description
+                            + " " + securityConnectionDiagnostics;
+    ensembleProvider = binding.ensembleProvider;
+  }
+
+  /**
+   * Supply the binding information.
+   * This implementation returns a fixed ensemble bonded to
+   * the quorum supplied by {@link #buildConnectionString()}
+   * @return the binding information
+   */
+  @Override
+  public BindingInformation supplyBindingInformation() {
+    BindingInformation binding = new BindingInformation();
+    String connectString = buildConnectionString();
+    binding.ensembleProvider = new FixedEnsembleProvider(connectString);
+    binding.description =
+        "fixed ZK quorum \"" + connectString + "\"";
+    return binding;
+  }
+
+  /**
+   * Override point: get the connection string used to connect to
+   * the ZK service
+   * @return a registry quorum
+   */
+  protected String buildConnectionString() {
+    return getConfig().getTrimmed(KEY_REGISTRY_ZK_QUORUM,
+        DEFAULT_REGISTRY_ZK_QUORUM);
+  }
+
+  /**
+   * Create an IOE when an operation fails
+   * @param path path of operation
+   * @param operation operation attempted
+   * @param exception caught the exception caught
+   * @return an IOE to throw that contains the path and operation details.
+   */
+  protected IOException operationFailure(String path,
+      String operation,
+      Exception exception) {
+    return operationFailure(path, operation, exception, null);
+  }
+
+  /**
+   * Create an IOE when an operation fails
+   * @param path path of operation
+   * @param operation operation attempted
+   * @param exception caught the exception caught
+   * @return an IOE to throw that contains the path and operation details.
+   */
+  protected IOException operationFailure(String path,
+      String operation,
+      Exception exception,
+      List<ACL> acls) {
+    IOException ioe;
+    String aclList = "[" + RegistrySecurity.aclsToString(acls) + "]";
+    if (exception instanceof KeeperException.NoNodeException) {
+      ioe = new PathNotFoundException(path);
+    } else if (exception instanceof KeeperException.NodeExistsException) {
+      ioe = new FileAlreadyExistsException(path);
+    } else if (exception instanceof KeeperException.NoAuthException) {
+      ioe = new NoPathPermissionsException(path,
+          "Not authorized to access path; ACLs: " + aclList);
+    } else if (exception instanceof KeeperException.NotEmptyException) {
+      ioe = new PathIsNotEmptyDirectoryException(path);
+    } else if (exception instanceof KeeperException.AuthFailedException) {
+      ioe = new AuthenticationFailedException(path,
+          "Authentication Failed: " + exception, exception);
+    } else if (exception instanceof KeeperException.NoChildrenForEphemeralsException) {
+      ioe = new NoChildrenForEphemeralsException(path,
+          "Cannot create a path under an ephemeral node: " + exception,
+          exception);
+    } else if (exception instanceof KeeperException.InvalidACLException) {
+      // this is a security exception of a kind
+      // include the ACLs to help the diagnostics
+      StringBuilder builder = new StringBuilder();
+      builder.append("Path access failure ").append(aclList);
+      builder.append(" ");
+      builder.append(securityConnectionDiagnostics);
+      ioe = new NoPathPermissionsException(path, builder.toString());
+    } else {
+      ioe = new RegistryIOException(path,
+          "Failure of " + operation + " on " + path + ": " +
+          exception.toString(),
+          exception);
+    }
+    if (ioe.getCause() == null) {
+      ioe.initCause(exception);
+    }
+    return ioe;
+  }
+
+  /**
+   * Create a path if it does not exist.
+   * The check is poll + create; there's a risk that another process
+   * may create the same path before the create() operation is executed/
+   * propagated to the ZK node polled.
+   *
+   * @param path path to create
+   * @param acl ACL for path -used when creating a new entry
+   * @param createParents flag to trigger parent creation
+   * @return true iff the path was created
+   * @throws IOException
+   */
+  @VisibleForTesting
+  public boolean maybeCreate(String path,
+      CreateMode mode,
+      List<ACL> acl,
+      boolean createParents) throws IOException {
+    return zkMkPath(path, mode, createParents, acl);
+  }
+
+  /**
+   * Stat the file
+   * @param path path of operation
+   * @return a curator stat entry
+   * @throws IOException on a failure
+   * @throws PathNotFoundException if the path was not found
+   */
+  public Stat zkStat(String path) throws IOException {
+    checkServiceLive();
+    String fullpath = createFullPath(path);
+    Stat stat;
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Stat {}", fullpath);
+      }
+      stat = curator.checkExists().forPath(fullpath);
+    } catch (Exception e) {
+      throw operationFailure(fullpath, "read()", e);
+    }
+    if (stat == null) {
+      throw new PathNotFoundException(path);
+    }
+    return stat;
+  }
+
+  /**
+   * Get the ACLs of a path
+   * @param path path of operation
+   * @return a possibly empty list of ACLs
+   * @throws IOException
+   */
+  public List<ACL> zkGetACLS(String path) throws IOException {
+    checkServiceLive();
+    String fullpath = createFullPath(path);
+    List<ACL> acls;
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("GetACLS {}", fullpath);
+      }
+      acls = curator.getACL().forPath(fullpath);
+    } catch (Exception e) {
+      throw operationFailure(fullpath, "read()", e);
+    }
+    if (acls == null) {
+      throw new PathNotFoundException(path);
+    }
+    return acls;
+  }
+
+  /**
+   * Probe for a path existing
+   * @param path path of operation
+   * @return true if the path was visible from the ZK server
+   * queried.
+   * @throws IOException on any exception other than
+   * {@link PathNotFoundException}
+   */
+  public boolean zkPathExists(String path) throws IOException {
+    checkServiceLive();
+    try {
+      return zkStat(path) != null;
+    } catch (PathNotFoundException e) {
+      return false;
+    } catch (IOException e) {
+      throw e;
+    }
+  }
+
+  /**
+   * Verify a path exists
+   * @param path path of operation
+   * @throws PathNotFoundException if the path is absent
+   * @throws IOException
+   */
+  public String zkPathMustExist(String path) throws IOException {
+    zkStat(path);
+    return path;
+  }
+
+  /**
+   * Create a directory. It is not an error if it already exists
+   * @param path path to create
+   * @param mode mode for path
+   * @param createParents flag to trigger parent creation
+   * @param acls ACL for path
+   * @throws IOException any problem
+   */
+  public boolean zkMkPath(String path,
+      CreateMode mode,
+      boolean createParents,
+      List<ACL> acls)
+      throws IOException {
+    checkServiceLive();
+    path = createFullPath(path);
+    if (acls == null || acls.isEmpty()) {
+      throw new NoPathPermissionsException(path, "Empty ACL list");
+    }
+
+    try {
+      RegistrySecurity.AclListInfo aclInfo =
+          new RegistrySecurity.AclListInfo(acls);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating path {} with mode {} and ACL {}",
+            path, mode, aclInfo);
+      }
+      CreateBuilder createBuilder = curator.create();
+      createBuilder.withMode(mode).withACL(acls);
+      if (createParents) {
+        createBuilder.creatingParentsIfNeeded();
+      }
+      createBuilder.forPath(path);
+
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("path already present: {}", path, e);
+      }
+      return false;
+    } catch (Exception e) {
+      throw operationFailure(path, "mkdir() ", e, acls);
+    }
+    return true;
+  }
+
+  /**
+   * Recursively make a path
+   * @param path path to create
+   * @param acl ACL for path
+   * @throws IOException any problem
+   */
+  public void zkMkParentPath(String path,
+      List<ACL> acl) throws
+      IOException {
+    // split path into elements
+
+    zkMkPath(RegistryPathUtils.parentOf(path),
+        CreateMode.PERSISTENT, true, acl);
+  }
+
+  /**
+   * Create a path with given data. byte[0] is used for a path
+   * without data
+   * @param path path of operation
+   * @param data initial data
+   * @param acls
+   * @throws IOException
+   */
+  public void zkCreate(String path,
+      CreateMode mode,
+      byte[] data,
+      List<ACL> acls) throws IOException {
+    Preconditions.checkArgument(data != null, "null data");
+    checkServiceLive();
+    String fullpath = createFullPath(path);
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Creating {} with {} bytes of data and ACL {}",
+            fullpath, data.length,
+            new RegistrySecurity.AclListInfo(acls));
+      }
+      curator.create().withMode(mode).withACL(acls).forPath(fullpath, data);
+    } catch (Exception e) {
+      throw operationFailure(fullpath, "create()", e, acls);
+    }
+  }
+
+  /**
+   * Update the data for a path
+   * @param path path of operation
+   * @param data new data
+   * @throws IOException
+   */
+  public void zkUpdate(String path, byte[] data) throws IOException {
+    Preconditions.checkArgument(data != null, "null data");
+    checkServiceLive();
+    path = createFullPath(path);
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Updating {} with {} bytes", path, data.length);
+      }
+      curator.setData().forPath(path, data);
+    } catch (Exception e) {
+      throw operationFailure(path, "update()", e);
+    }
+  }
+
+  /**
+   * Create or update an entry
+   * @param path path
+   * @param data data
+   * @param acl ACL for path -used when creating a new entry
+   * @param overwrite enable overwrite
+   * @throws IOException
+   * @return true if the entry was created, false if it was simply updated.
+   */
+  public boolean zkSet(String path,
+      CreateMode mode,
+      byte[] data,
+      List<ACL> acl, boolean overwrite) throws IOException {
+    Preconditions.checkArgument(data != null, "null data");
+    checkServiceLive();
+    if (!zkPathExists(path)) {
+      zkCreate(path, mode, data, acl);
+      return true;
+    } else {
+      if (overwrite) {
+        zkUpdate(path, data);
+        return false;
+      } else {
+        throw new FileAlreadyExistsException(path);
+      }
+    }
+  }
+
+  /**
+   * Delete a directory/directory tree.
+   * It is not an error to delete a path that does not exist
+   * @param path path of operation
+   * @param recursive flag to trigger recursive deletion
+   * @param backgroundCallback callback; this being set converts the operation
+   * into an async/background operation.
+   * task
+   * @throws IOException on problems other than no-such-path
+   */
+  public void zkDelete(String path,
+      boolean recursive,
+      BackgroundCallback backgroundCallback) throws IOException {
+    checkServiceLive();
+    String fullpath = createFullPath(path);
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting {}", fullpath);
+      }
+      DeleteBuilder delete = curator.delete();
+      if (recursive) {
+        delete.deletingChildrenIfNeeded();
+      }
+      if (backgroundCallback != null) {
+        delete.inBackground(backgroundCallback);
+      }
+      delete.forPath(fullpath);
+    } catch (KeeperException.NoNodeException e) {
+      // not an error
+    } catch (Exception e) {
+      throw operationFailure(fullpath, "delete()", e);
+    }
+  }
+
+  /**
+   * List all children of a path
+   * @param path path of operation
+   * @return a possibly empty list of children
+   * @throws IOException
+   */
+  public List<String> zkList(String path) throws IOException {
+    checkServiceLive();
+    String fullpath = createFullPath(path);
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("ls {}", fullpath);
+      }
+      GetChildrenBuilder builder = curator.getChildren();
+      List<String> children = builder.forPath(fullpath);
+      return children;
+    } catch (Exception e) {
+      throw operationFailure(path, "ls()", e);
+    }
+  }
+
+  /**
+   * Read data on a path
+   * @param path path of operation
+   * @return the data
+   * @throws IOException read failure
+   */
+  public byte[] zkRead(String path) throws IOException {
+    checkServiceLive();
+    String fullpath = createFullPath(path);
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Reading {}", fullpath);
+      }
+      return curator.getData().forPath(fullpath);
+    } catch (Exception e) {
+      throw operationFailure(fullpath, "read()", e);
+    }
+  }
+
+  /**
+   * Return a path dumper instance which can do a full dump
+   * of the registry tree in its <code>toString()</code>
+   * operation
+   * @return a class to dump the registry
+   * @param verbose verbose flag - includes more details (such as ACLs)
+   */
+  public ZKPathDumper dumpPath(boolean verbose) {
+    return new ZKPathDumper(curator, registryRoot, verbose);
+  }
+
+  /**
+   * Add a new write access entry for all future write operations.
+   * @param id ID to use
+   * @param pass password
+   * @throws IOException on any failure to build the digest
+   */
+  public boolean addWriteAccessor(String id, String pass) throws IOException {
+    RegistrySecurity security = getRegistrySecurity();
+    ACL digestACL = new ACL(ZooDefs.Perms.ALL,
+        security.toDigestId(security.digest(id, pass)));
+    return security.addDigestACL(digestACL);
+  }
+
+  /**
+   * Clear all write accessors
+   */
+  public void clearWriteAccessors() {
+    getRegistrySecurity().resetDigestACLs();
+  }
+
+
+  /**
+   * Diagnostics method to dump a registry robustly.
+   * Any exception raised is swallowed
+   * @param verbose verbose path dump
+   * @return the registry tree
+   */
+  protected String dumpRegistryRobustly(boolean verbose) {
+    try {
+      ZKPathDumper pathDumper = dumpPath(verbose);
+      return pathDumper.toString();
+    } catch (Exception e) {
+      // ignore
+      LOG.debug("Ignoring exception:  {}", e);
+    }
+    return "";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java
new file mode 100644
index 0000000..bab4742
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryBindingSource.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Interface which can be implemented by a registry binding source
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface RegistryBindingSource {
+
+  /**
+   * Supply the binding information for this registry
+   * @return the binding information data
+   */
+  BindingInformation supplyBindingInformation();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java
new file mode 100644
index 0000000..f04673a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryInternalConstants.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import org.apache.zookeeper.ZooDefs;
+
+/**
+ * Internal constants for the registry.
+ *
+ * These are the things which aren't visible to users.
+ *
+ */
+public interface RegistryInternalConstants {
+
+  /**
+   * Pattern of a single entry in the registry path. : {@value}.
+   * <p>
+   * This is what constitutes a valid hostname according to current RFCs.
+   * Alphanumeric first two and last one digit, alphanumeric
+   * and hyphens allowed in between.
+   * <p>
+   * No upper limit is placed on the size of an entry.
+   */
+  String VALID_PATH_ENTRY_PATTERN =
+      "([a-z0-9]|[a-z0-9][a-z0-9\\-]*[a-z0-9])";
+
+  /**
+   * Permissions for readers: {@value}.
+   */
+  int PERMISSIONS_REGISTRY_READERS = ZooDefs.Perms.READ;
+
+  /**
+   * Permissions for system services: {@value}
+   */
+  int PERMISSIONS_REGISTRY_SYSTEM_SERVICES = ZooDefs.Perms.ALL;
+
+  /**
+   * Permissions for a user's root entry: {@value}.
+   * All except the admin permissions (ACL access) on a node
+   */
+  int PERMISSIONS_REGISTRY_USER_ROOT =
+      ZooDefs.Perms.READ | ZooDefs.Perms.WRITE | ZooDefs.Perms.CREATE |
+      ZooDefs.Perms.DELETE;
+
+  /**
+   * Name of the SASL auth provider which has to be added to ZK server to enable
+   * sasl: auth patterns: {@value}.
+   *
+   * Without this callers can connect via SASL, but
+   * they can't use it in ACLs
+   */
+  String SASLAUTHENTICATION_PROVIDER =
+      "org.apache.zookeeper.server.auth.SASLAuthenticationProvider";
+
+  /**
+   * String to use as the prefix when declaring a new auth provider: {@value}.
+   */
+  String ZOOKEEPER_AUTH_PROVIDER = "zookeeper.authProvider";
+
+  /**
+   * This the Hadoop environment variable which propagates the identity
+   * of a user in an insecure cluster
+   */
+  String HADOOP_USER_NAME = "HADOOP_USER_NAME";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/088ae9c5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
new file mode 100644
index 0000000..c54c205
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/RegistryOperationsService.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.registry.client.impl.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidPathnameException;
+import org.apache.hadoop.registry.client.types.RegistryPathStatus;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * The Registry operations service.
+ * <p>
+ * This service implements the {@link RegistryOperations}
+ * API by mapping the commands to zookeeper operations, and translating
+ * results and exceptions back into those specified by the API.
+ * <p>
+ * Factory methods should hide the detail that this has been implemented via
+ * the {@link CuratorService} by returning it cast to that
+ * {@link RegistryOperations} interface, rather than this implementation class.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class RegistryOperationsService extends CuratorService
+  implements RegistryOperations {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RegistryOperationsService.class);
+
+  private final RegistryUtils.ServiceRecordMarshal serviceRecordMarshal
+      = new RegistryUtils.ServiceRecordMarshal();
+
+  public RegistryOperationsService(String name) {
+    this(name, null);
+  }
+
+  public RegistryOperationsService() {
+    this("RegistryOperationsService");
+  }
+
+  public RegistryOperationsService(String name,
+      RegistryBindingSource bindingSource) {
+    super(name, bindingSource);
+  }
+
+  /**
+   * Get the aggregate set of ACLs the client should use
+   * to create directories
+   * @return the ACL list
+   */
+  public List<ACL> getClientAcls() {
+    return getRegistrySecurity().getClientACLs();
+  }
+
+  /**
+   * Validate a path ... this includes checking that they are DNS-valid
+   * @param path path to validate
+   * @throws InvalidPathnameException if a path is considered invalid
+   */
+  protected void validatePath(String path) throws InvalidPathnameException {
+    RegistryPathUtils.validateElementsAsDNS(path);
+  }
+
+  @Override
+  public boolean mknode(String path, boolean createParents) throws IOException {
+    validatePath(path);
+    return zkMkPath(path, CreateMode.PERSISTENT, createParents, getClientAcls());
+  }
+
+  @Override
+  public void bind(String path,
+      ServiceRecord record,
+      int flags) throws IOException {
+    Preconditions.checkArgument(record != null, "null record");
+    validatePath(path);
+    LOG.info("Bound at {} : {}", path, record);
+
+    CreateMode mode = CreateMode.PERSISTENT;
+    byte[] bytes = serviceRecordMarshal.toByteswithHeader(record);
+    zkSet(path, mode, bytes, getClientAcls(),
+        ((flags & BindFlags.OVERWRITE) != 0));
+  }
+
+  @Override
+  public ServiceRecord resolve(String path) throws IOException {
+    byte[] bytes = zkRead(path);
+    return serviceRecordMarshal.fromBytesWithHeader(path, bytes);
+  }
+
+  @Override
+  public boolean exists(String path) throws IOException {
+    validatePath(path);
+    return zkPathExists(path);
+  }
+
+  @Override
+  public RegistryPathStatus stat(String path) throws IOException {
+    validatePath(path);
+    Stat stat = zkStat(path);
+
+    String name = RegistryPathUtils.lastPathEntry(path);
+    RegistryPathStatus status = new RegistryPathStatus(
+        name,
+        stat.getCtime(),
+        stat.getDataLength(),
+        stat.getNumChildren());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Stat {} => {}", path, status);
+    }
+    return status;
+  }
+
+  @Override
+  public List<String> list(String path) throws IOException {
+    validatePath(path);
+    return zkList(path);
+  }
+
+  @Override
+  public void delete(String path, boolean recursive) throws IOException {
+    validatePath(path);
+    zkDelete(path, recursive, null);
+  }
+
+}