You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/13 22:53:13 UTC

[21/74] [abbrv] hadoop git commit: YARN-5461. Initial code ported from slider-core module. (jianhe)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
new file mode 100644
index 0000000..bf71861
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.restclient;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.slider.core.exceptions.ExceptionConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Class to bond to a Jersey client, for UGI integration and SPNEGO.
+ * <p>
+ *   Usage: create an instance, then when creating a Jersey <code>Client</code>
+ *   pass in to the constructor the handler provided by {@link #getHandler()}
+ *
+ * see <a href="https://jersey.java.net/apidocs/1.17/jersey/com/sun/jersey/client/urlconnection/HttpURLConnectionFactory.html">Jersey docs</a>
+ */
+public class UgiJerseyBinding implements
+    HttpURLConnectionFactory {
+  private static final Logger log =
+      LoggerFactory.getLogger(UgiJerseyBinding.class);
+
+  private final UrlConnectionOperations operations;
+  private final URLConnectionClientHandler handler;
+
+  /**
+   * Construct an instance
+   * @param operations operations instance
+   */
+  @SuppressWarnings("ThisEscapedInObjectConstruction")
+  public UgiJerseyBinding(UrlConnectionOperations operations) {
+    Preconditions.checkArgument(operations != null, "Null operations");
+    this.operations = operations;
+    handler = new URLConnectionClientHandler(this);
+  }
+
+  /**
+   * Create an instance off the configuration. The SPNEGO policy
+   * is derived from the current UGI settings.
+   * @param conf config
+   */
+  public UgiJerseyBinding(Configuration conf) {
+    this(new UrlConnectionOperations(conf));
+  }
+
+  /**
+   * Get a URL connection. 
+   * @param url URL to connect to
+   * @return the connection
+   * @throws IOException any problem. {@link AuthenticationException} 
+   * errors are wrapped
+   */
+  @Override
+  public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+    try {
+      // open a connection handling status codes and so redirections
+      // but as it opens a connection, it's less useful than you think.
+
+      return operations.openConnection(url);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public UrlConnectionOperations getOperations() {
+    return operations;
+  }
+
+  public URLConnectionClientHandler getHandler() {
+    return handler;
+  }
+  
+  /**
+   * Get the SPNEGO flag (as found in the operations instance
+   * @return the spnego policy
+   */
+  public boolean isUseSpnego() {
+    return operations.isUseSpnego();
+  }
+
+
+  /**
+   * Uprate error codes 400 and up into faults; 
+   * <p>
+   * see {@link ExceptionConverter#convertJerseyException(String, String, UniformInterfaceException)}
+   */
+  public static IOException uprateFaults(HttpVerb verb, String url,
+      UniformInterfaceException ex)
+      throws IOException {
+    return ExceptionConverter.convertJerseyException(verb.getVerb(),
+        url, ex);
+  }
+
+  /**
+   * Create the standard Jersey client Config
+   * @return the recommended Jersey Client config
+   */
+  public ClientConfig createJerseyClientConfig() {
+    ClientConfig clientConfig = new DefaultClientConfig();
+    clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, true);
+    return clientConfig;
+  }
+
+  /**
+   * Create a jersey client bonded to this handler, using the
+   * supplied client config
+   * @param clientConfig client configuratin
+   * @return a new client instance to use
+   */
+  public Client createJerseyClient(ClientConfig clientConfig) {
+    return new Client(getHandler(), clientConfig);
+  }
+
+  /**
+   * Create a jersey client bonded to this handler, using the
+   * client config created with {@link #createJerseyClientConfig()}
+   * @return a new client instance to use
+   */
+  public Client createJerseyClient() {
+    return createJerseyClient(createJerseyClientConfig());
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
new file mode 100644
index 0000000..20ef198
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.restclient;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Operations on the JDK UrlConnection class.
+ *
+ */
+public class UrlConnectionOperations extends Configured  {
+  private static final Logger log =
+      LoggerFactory.getLogger(UrlConnectionOperations.class);
+
+  private SliderURLConnectionFactory connectionFactory;
+
+  private boolean useSpnego = false;
+
+  /**
+   * Create an instance off the configuration. The SPNEGO policy
+   * is derived from the current UGI settings.
+   * @param conf config
+   */
+  public UrlConnectionOperations(Configuration conf) {
+    super(conf);
+    connectionFactory = SliderURLConnectionFactory.newInstance(conf);
+    if (UserGroupInformation.isSecurityEnabled()) {
+      log.debug("SPNEGO is enabled");
+      setUseSpnego(true);
+    }
+  }
+
+
+  public boolean isUseSpnego() {
+    return useSpnego;
+  }
+
+  public void setUseSpnego(boolean useSpnego) {
+    this.useSpnego = useSpnego;
+  }
+
+  /**
+   * Opens a url with cache disabled, redirect handled in 
+   * (JDK) implementation.
+   *
+   * @param url to open
+   * @return URLConnection
+   * @throws IOException
+   * @throws AuthenticationException authentication failure
+   */
+  public HttpURLConnection openConnection(URL url) throws
+      IOException,
+      AuthenticationException {
+    Preconditions.checkArgument(url.getPort() != 0, "no port");
+    return (HttpURLConnection) connectionFactory.openConnection(url, useSpnego);
+  }
+
+  public HttpOperationResponse execGet(URL url) throws
+      IOException,
+      AuthenticationException {
+    return execHttpOperation(HttpVerb.GET, url, null, "");
+  }
+
+  public HttpOperationResponse execHttpOperation(HttpVerb verb,
+      URL url,
+      byte[] payload,
+      String contentType)
+      throws IOException, AuthenticationException {
+    HttpURLConnection conn = null;
+    HttpOperationResponse outcome = new HttpOperationResponse();
+    int resultCode;
+    byte[] body = null;
+    log.debug("{} {} spnego={}", verb, url, useSpnego);
+
+    boolean doOutput = verb.hasUploadBody();
+    if (doOutput) {
+      Preconditions.checkArgument(payload !=null,
+          "Null payload on a verb which expects one");
+    }
+    try {
+      conn = openConnection(url);
+      conn.setRequestMethod(verb.getVerb());
+      conn.setDoOutput(doOutput);
+      if (doOutput) {
+        conn.setRequestProperty("Content-Type", contentType);
+      }
+
+      // now do the connection
+      conn.connect();
+      
+      if (doOutput) {
+        OutputStream output = conn.getOutputStream();
+        IOUtils.write(payload, output);
+        output.close();
+      }
+      
+      resultCode = conn.getResponseCode();
+      outcome.lastModified = conn.getLastModified();
+      outcome.contentType = conn.getContentType();
+      outcome.headers = conn.getHeaderFields();
+      InputStream stream = conn.getErrorStream();
+      if (stream == null) {
+        stream = conn.getInputStream();
+      }
+      if (stream != null) {
+        // read into a buffer.
+        body = IOUtils.toByteArray(stream);
+      } else {
+        // no body: 
+        log.debug("No body in response");
+
+      }
+    } catch (SSLException e) {
+      throw e;
+    } catch (IOException e) {
+      throw NetUtils.wrapException(url.toString(),
+          url.getPort(), "localhost", 0, e);
+
+    } catch (AuthenticationException e) {
+      throw new AuthenticationException("From " + url + ": " + e, e);
+
+    } finally {
+      if (conn != null) {
+        conn.disconnect();
+      }
+    }
+    uprateFaults(HttpVerb.GET, url.toString(), resultCode, "", body);
+    outcome.responseCode = resultCode;
+    outcome.data = body;
+    return outcome;
+  }
+
+  /**
+   * Uprate error codes 400 and up into faults; 
+   * 404 is converted to a {@link NotFoundException},
+   * 401 to {@link ForbiddenException}
+   *
+   * @param verb HTTP Verb used
+   * @param url URL as string
+   * @param resultCode response from the request
+   * @param bodyAsString
+   *@param body optional body of the request  @throws IOException if the result was considered a failure
+   */
+  public static void uprateFaults(HttpVerb verb, String url,
+      int resultCode, String bodyAsString, byte[] body)
+      throws IOException {
+
+    if (resultCode < 400) {
+      //success
+      return;
+    }
+    String msg = verb.toString() +" "+ url;
+    if (resultCode == 404) {
+      throw new NotFoundException(msg);
+    }
+    if (resultCode == 401) {
+      throw new ForbiddenException(msg);
+    }
+    // all other error codes
+    
+    // get a string respnse
+    if (bodyAsString == null) {
+      if (body != null && body.length > 0) {
+        bodyAsString = new String(body);
+      } else {
+        bodyAsString = "";
+      }
+    }
+    String message =  msg +
+                     " failed with exit code " + resultCode
+                     + ", body length " + bodyAsString.length()
+                     + ":\n" + bodyAsString;
+    log.error(message);
+    throw new IOException(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
new file mode 100644
index 0000000..ca49888
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BlockingZKWatcher implements Watcher {
+  
+  protected static final Logger log =
+    LoggerFactory.getLogger(BlockingZKWatcher.class);
+  private final AtomicBoolean connectedFlag = new AtomicBoolean(false);
+
+  @Override
+  public void process(WatchedEvent event) {
+    log.info("ZK binding callback received");
+    connectedFlag.set(true);
+    synchronized (connectedFlag) {
+      try {
+        connectedFlag.notify();
+      } catch (Exception e) {
+        log.warn("failed while waiting for notification", e);
+      }
+    }
+  }
+
+  /**
+   * Wait for a flag to go true
+   * @param timeout timeout in millis
+   */
+
+  public void waitForZKConnection(int timeout)
+      throws InterruptedException, ConnectException {
+    synchronized (connectedFlag) {
+      if (!connectedFlag.get()) {
+        log.info("waiting for ZK event");
+        //wait a bit
+        connectedFlag.wait(timeout);
+      }
+    }
+    if (!connectedFlag.get()) {
+      throw new ConnectException("Unable to connect to ZK quorum");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..c8b3adb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * This is a version of the HBase ZK cluster cut out to be standalone.
+ * 
+ * <i>Important: keep this Java6 language level for now</i>
+ */
+public class MiniZooKeeperCluster extends AbstractService {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      MiniZooKeeperCluster.class);
+
+  private static final int TICK_TIME = 2000;
+  private static final int CONNECTION_TIMEOUT = 30000;
+  public static final int MAX_CLIENT_CONNECTIONS = 1000;
+
+  private boolean started;
+
+  /** The default port. If zero, we use a random port. */
+  private int defaultClientPort = 0;
+
+  private int clientPort;
+
+  private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
+  private final List<ZooKeeperServer> zooKeeperServers;
+  private final List<Integer> clientPortList;
+
+  private int activeZKServerIndex;
+  private int tickTime = 0;
+  private File baseDir;
+  private final int numZooKeeperServers;
+  private String zkQuorum = "";
+
+  public MiniZooKeeperCluster(int numZooKeeperServers) {
+    super("MiniZooKeeperCluster");
+    this.numZooKeeperServers = numZooKeeperServers;
+    this.started = false;
+    activeZKServerIndex = -1;
+    zooKeeperServers = new ArrayList<ZooKeeperServer>();
+    clientPortList = new ArrayList<Integer>();
+    standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
+  }
+
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  public void setDefaultClientPort(int clientPort) {
+    if (clientPort <= 0) {
+      throw new IllegalArgumentException("Invalid default ZK client port: "
+                                         + clientPort);
+    }
+    this.defaultClientPort = clientPort;
+  }
+
+  /**
+   * Selects a ZK client port. Returns the default port if specified.
+   * Otherwise, returns a random port. The random port is selected from the
+   * range between 49152 to 65535. These ports cannot be registered with IANA
+   * and are intended for dynamic allocation (see http://bit.ly/dynports).
+   */
+  private int selectClientPort(Random r) {
+    if (defaultClientPort > 0) {
+      return defaultClientPort;
+    }
+    return 0xc000 + r.nextInt(0x3f00);
+  }
+
+  public void setTickTime(int tickTime) {
+    this.tickTime = tickTime;
+  }
+
+  public int getBackupZooKeeperServerNum() {
+    return zooKeeperServers.size() - 1;
+  }
+
+  public int getZooKeeperServerNum() {
+    return zooKeeperServers.size();
+  }
+
+  // / XXX: From o.a.zk.t.ClientBase
+  private static void setupTestEnv() {
+    // during the tests we run with 100K prealloc in the logs.
+    // on windows systems prealloc of 64M was seen to take ~15seconds
+    // resulting in test failure (client timeout on first session).
+    // set env and directly in order to handle static init/gc issues
+    System.setProperty("zookeeper.preAllocSize", "100");
+    FileTxnLog.setPreallocSize(100 * 1024);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    startup();
+  }
+
+  /**
+   * @param baseDir
+   * @param numZooKeeperServers
+   * @return ClientPort server bound to, -1 if there was a
+   *         binding problem and we couldn't pick another port.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private int startup() throws IOException,
+      InterruptedException {
+    if (numZooKeeperServers <= 0)
+      return -1;
+
+    setupTestEnv();
+    started = true;
+    baseDir = File.createTempFile("zookeeper", ".dir");
+    recreateDir(baseDir);
+
+    StringBuilder quorumList = new StringBuilder();
+    Random rnd = new Random();
+    int tentativePort = selectClientPort(rnd);
+
+    // running all the ZK servers
+    for (int i = 0; i < numZooKeeperServers; i++) {
+      File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+      recreateDir(dir);
+      int tickTimeToUse;
+      if (this.tickTime > 0) {
+        tickTimeToUse = this.tickTime;
+      } else {
+        tickTimeToUse = TICK_TIME;
+      }
+      ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+      NIOServerCnxnFactory standaloneServerFactory;
+      while (true) {
+        try {
+          standaloneServerFactory = new NIOServerCnxnFactory();
+          standaloneServerFactory.configure(
+              new InetSocketAddress(tentativePort),
+              MAX_CLIENT_CONNECTIONS
+          );
+        } catch (BindException e) {
+          LOG.debug("Failed binding ZK Server to client port: " +
+                    tentativePort, e);
+          // We're told to use some port but it's occupied, fail
+          if (defaultClientPort > 0) return -1;
+          // This port is already in use, try to use another.
+          tentativePort = selectClientPort(rnd);
+          continue;
+        }
+        break;
+      }
+
+      // Start up this ZK server
+      standaloneServerFactory.startup(server);
+      if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+        throw new IOException("Waiting for startup of standalone server");
+      }
+
+      // We have selected this port as a client port.
+      clientPortList.add(tentativePort);
+      standaloneServerFactoryList.add(standaloneServerFactory);
+      zooKeeperServers.add(server);
+      if (quorumList.length() > 0) {
+        quorumList.append(",");
+      }
+      quorumList.append("localhost:").append(tentativePort);
+      tentativePort++; //for the next server
+    }
+
+    // set the first one to be active ZK; Others are backups
+    activeZKServerIndex = 0;
+
+    clientPort = clientPortList.get(activeZKServerIndex);
+    zkQuorum = quorumList.toString();
+    LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+             "on client port: " + clientPort);
+    return clientPort;
+  }
+
+  private void recreateDir(File dir) throws IOException {
+    if (dir.exists()) {
+      if (!FileUtil.fullyDelete(dir)) {
+        throw new IOException("Could not delete zk base directory: " + dir);
+      }
+    }
+    try {
+      dir.mkdirs();
+    } catch (SecurityException e) {
+      throw new IOException("creating dir: " + dir, e);
+    }
+  }
+
+  /**
+   * Delete the basedir
+   */
+  private void deleteBaseDir() {
+    if (baseDir != null) {
+      baseDir.delete();
+      baseDir = null;
+    }
+
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+
+    if (!started) {
+      return;
+    }
+    started = false;
+
+    try {
+      // shut down all the zk servers
+      for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+        NIOServerCnxnFactory standaloneServerFactory =
+            standaloneServerFactoryList.get(i);
+        int clientPort = clientPortList.get(i);
+  
+        standaloneServerFactory.shutdown();
+        if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+          throw new IOException("Waiting for shutdown of standalone server");
+        }
+      }
+      for (ZooKeeperServer zkServer : zooKeeperServers) {
+        //explicitly close ZKDatabase since ZookeeperServer does not close them
+        zkServer.getZKDatabase().close();
+      }
+    } finally {
+      // clear everything
+      activeZKServerIndex = 0;
+      standaloneServerFactoryList.clear();
+      clientPortList.clear();
+      zooKeeperServers.clear();
+    }
+
+    LOG.info("Shutdown MiniZK cluster with all ZK servers");
+  }
+
+  /**@return clientPort return clientPort if there is another ZK backup can run
+   *         when killing the current active; return -1, if there is no backups.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public int killCurrentActiveZooKeeperServer() throws IOException,
+      InterruptedException {
+    if (!started || activeZKServerIndex < 0) {
+      return -1;
+    }
+
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+        standaloneServerFactoryList.get(activeZKServerIndex);
+    int clientPort = clientPortList.get(activeZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+    // remove the current active zk server
+    standaloneServerFactoryList.remove(activeZKServerIndex);
+    clientPortList.remove(activeZKServerIndex);
+    zooKeeperServers.remove(activeZKServerIndex);
+    LOG.info("Kill the current active ZK servers in the cluster " +
+             "on client port: " + clientPort);
+
+    if (standaloneServerFactoryList.size() == 0) {
+      // there is no backup servers;
+      return -1;
+    }
+    clientPort = clientPortList.get(activeZKServerIndex);
+    LOG.info("Activate a backup zk server in the cluster " +
+             "on client port: " + clientPort);
+    // return the next back zk server's port
+    return clientPort;
+  }
+
+  /**
+   * Kill one back up ZK servers
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void killOneBackupZooKeeperServer() throws IOException,
+      InterruptedException {
+    if (!started || activeZKServerIndex < 0 ||
+        standaloneServerFactoryList.size() <= 1) {
+      return;
+    }
+
+    int backupZKServerIndex = activeZKServerIndex + 1;
+    // Shutdown the current active one
+    NIOServerCnxnFactory standaloneServerFactory =
+        standaloneServerFactoryList.get(backupZKServerIndex);
+    int clientPort = clientPortList.get(backupZKServerIndex);
+
+    standaloneServerFactory.shutdown();
+    if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+      throw new IOException("Waiting for shutdown of standalone server");
+    }
+
+    zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+    // remove this backup zk server
+    standaloneServerFactoryList.remove(backupZKServerIndex);
+    clientPortList.remove(backupZKServerIndex);
+    zooKeeperServers.remove(backupZKServerIndex);
+    LOG.info("Kill one backup ZK servers in the cluster " +
+             "on client port: " + clientPort);
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerDown(int port, long timeout) throws
+      InterruptedException {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = null;
+        try {
+          sock = new Socket("localhost", port);
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+        } finally {
+          IOUtils.closeSocket(sock);
+        }
+      } catch (IOException e) {
+        return true;
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      Thread.sleep(250);
+    }
+    return false;
+  }
+
+  // XXX: From o.a.zk.t.ClientBase
+  private static boolean waitForServerUp(int port, long timeout) throws
+      InterruptedException {
+    long start = System.currentTimeMillis();
+    while (true) {
+      try {
+        Socket sock = null;
+        sock = new Socket("localhost", port);
+        BufferedReader reader = null;
+        try {
+          OutputStream outstream = sock.getOutputStream();
+          outstream.write("stat".getBytes());
+          outstream.flush();
+
+          Reader isr = new InputStreamReader(sock.getInputStream());
+          reader = new BufferedReader(isr);
+          String line = reader.readLine();
+          if (line != null && line.startsWith("Zookeeper version:")) {
+            return true;
+          }
+        } finally {
+          IOUtils.closeSocket(sock);
+          IOUtils.closeStream(reader);
+        }
+      } catch (IOException e) {
+        // ignore as this is expected
+        LOG.debug("server localhost:" + port + " not up " + e);
+      }
+
+      if (System.currentTimeMillis() > start + timeout) {
+        break;
+      }
+      Thread.sleep(250);
+    }
+    return false;
+  }
+
+  public int getClientPort() {
+    return clientPort;
+  }
+
+  public String getZkQuorum() {
+    return zkQuorum;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
new file mode 100644
index 0000000..045b72c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Relays ZK watcher events to a closure
+ */
+public abstract class ZKCallback implements Watcher {
+
+  public ZKCallback() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
new file mode 100644
index 0000000..ca41e4b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class ZKIntegration implements Watcher, Closeable {
+
+/**
+ * Base path for services
+ */
+  public static String ZK_SERVICES = "services";
+  /**
+   * Base path for all Slider references
+   */
+  public static String ZK_SLIDER = "slider";
+  public static String ZK_USERS = "users";
+  public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
+  public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
+
+  public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>();
+  static {
+    ZK_USERS_PATH_LIST.add(ZK_SERVICES);
+    ZK_USERS_PATH_LIST.add(ZK_SLIDER);
+    ZK_USERS_PATH_LIST.add(ZK_USERS);
+  }
+
+  public static int SESSION_TIMEOUT = 30000;
+  protected static final Logger log =
+    LoggerFactory.getLogger(ZKIntegration.class);
+  private ZooKeeper zookeeper;
+  private final String username;
+  private final String clustername;
+  private final String userPath;
+  private int sessionTimeout = SESSION_TIMEOUT;
+/**
+ flag to set to indicate that the user path should be created if
+ it is not already there
+ */
+  private final AtomicBoolean toInit = new AtomicBoolean(false);
+  private final boolean createClusterPath;
+  private final Watcher watchEventHandler;
+  private final String zkConnection;
+  private final boolean canBeReadOnly;
+
+  protected ZKIntegration(String zkConnection,
+                          String username,
+                          String clustername,
+                          boolean canBeReadOnly,
+                          boolean createClusterPath,
+                          Watcher watchEventHandler,
+                          int sessionTimeout
+  ) throws IOException {
+    this.username = username;
+    this.clustername = clustername;
+    this.watchEventHandler = watchEventHandler;
+    this.zkConnection = zkConnection;
+    this.canBeReadOnly = canBeReadOnly;
+    this.createClusterPath = createClusterPath;
+    this.sessionTimeout = sessionTimeout;
+    this.userPath = mkSliderUserPath(username);
+  }
+
+  public void init() throws IOException {
+    assert zookeeper == null;
+    log.debug("Binding ZK client to {}", zkConnection);
+    zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
+  }
+
+  /**
+   * Create an instance bonded to the specific closure
+   * @param zkConnection
+   * @param username
+   * @param clustername
+   * @param canBeReadOnly
+   * @param watchEventHandler
+   * @return the new instance
+   * @throws IOException
+   */
+  public static ZKIntegration newInstance(String zkConnection,
+      String username,
+      String clustername,
+      boolean createClusterPath,
+      boolean canBeReadOnly,
+      Watcher watchEventHandler,
+      int sessionTimeout) throws IOException {
+
+    return new ZKIntegration(zkConnection,
+                             username,
+                             clustername,
+                             canBeReadOnly,
+                             createClusterPath,
+                             watchEventHandler,
+                             sessionTimeout);
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (zookeeper != null) {
+      try {
+        zookeeper.close();
+      } catch (InterruptedException ignored) {
+
+      }
+      zookeeper = null;
+    }
+  }
+
+  public String getConnectionString() {
+    return zkConnection;
+  }
+
+  public String getClusterPath() {
+    return mkClusterPath(username, clustername);
+  }
+
+  public boolean getConnected() {
+    return zookeeper.getState().isConnected();
+  }
+
+  public boolean getAlive() {
+    return zookeeper.getState().isAlive();
+  }
+
+  public ZooKeeper.States getState() {
+    return zookeeper.getState();
+  }
+
+  public Stat getClusterStat() throws KeeperException, InterruptedException {
+    return stat(getClusterPath());
+  }
+
+  public boolean exists(String path) throws
+                                     KeeperException,
+                                     InterruptedException {
+    return stat(path) != null;
+  }
+
+  public Stat stat(String path) throws KeeperException, InterruptedException {
+    return zookeeper.exists(path, false);
+  }
+
+  @Override
+  public String toString() {
+    return "ZK integration bound @  " + zkConnection + ": " + zookeeper;
+  }
+  
+/**
+ * Event handler to notify of state events
+ * @param event
+ */
+  @Override
+  public void process(WatchedEvent event) {
+    log.debug("{}", event);
+    try {
+      maybeInit();
+    } catch (Exception e) {
+      log.error("Failed to init", e);
+    }
+    if (watchEventHandler != null) {
+      watchEventHandler.process(event);
+    }
+  }
+
+  private void maybeInit() throws KeeperException, InterruptedException {
+    if (!toInit.getAndSet(true) && createClusterPath) {
+      log.debug("initing");
+      //create the user path
+      mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      //create the specific user
+      createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    }
+  }
+
+  /**
+   * Create a path under a parent, don't care if it already exists
+   * As the path isn't returned, this isn't the way to create sequentially
+   * numbered nodes.
+   * @param parent parent dir. Must have a trailing / if entry!=null||empty 
+   * @param entry entry -can be null or "", in which case it is not appended
+   * @param acl
+   * @param createMode
+   * @return the path if created; null if not
+   */
+  public String createPath(String parent,
+                           String entry,
+                           List<ACL> acl,
+                           CreateMode createMode) throws KeeperException, InterruptedException {
+    //initial create of full path
+    assert acl != null;
+    assert !acl.isEmpty();
+    assert parent != null;
+    String path = parent;
+    if (entry != null) {
+      path = path + entry;
+    }
+    try {
+      log.debug("Creating ZK path {}", path);
+      return zookeeper.create(path, null, acl, createMode);
+    } catch (KeeperException.NodeExistsException ignored) {
+      //node already there
+      log.debug("node already present:{}",path);
+      return null;
+    }
+  }
+
+  /**
+   * Recursive path create
+   * @param paths path list
+   * @param acl acl list
+   * @param createMode create modes
+   */
+  public void mkPath(List<String> paths,
+                     List<ACL> acl,
+                     CreateMode createMode) throws KeeperException, InterruptedException {
+    String history = "/";
+    for (String entry : paths) {
+      createPath(history, entry, acl, createMode);
+      history = history + entry + "/";
+    }
+  }
+
+/**
+ * Blocking enum of users
+ * @return an unordered list of clusters under a user
+ */
+  public List<String> getClusters() throws KeeperException, InterruptedException {
+    return zookeeper.getChildren(userPath, null);
+  }
+
+  /**
+   * Delete a node, does not throw an exception if the path is not fond
+   * @param path path to delete
+   * @return true if the path could be deleted, false if there was no node to delete 
+   *
+   */
+  public boolean delete(String path) throws
+                                     InterruptedException,
+                                     KeeperException {
+    try {
+      zookeeper.delete(path, -1);
+      log.debug("Deleting {}", path);
+      return true;
+    } catch (KeeperException.NoNodeException ignored) {
+      return false;
+    }
+  }
+
+  /**
+   * Recursively delete a node, does not throw exception if any node does not exist.
+   * @param path
+   * @return true if delete was successful
+   */
+  public boolean deleteRecursive(String path) throws KeeperException, InterruptedException {
+
+    try {
+      List<String> children = zookeeper.getChildren(path, false);
+      for (String child : children) {
+        deleteRecursive(path + "/" + child);
+      }
+      delete(path);
+    } catch (KeeperException.NoNodeException ignored) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+ * Build the path to a cluster; exists once the cluster has come up.
+ * Even before that, a ZK watcher could wait for it.
+ * @param username user
+ * @param clustername name of the cluster
+ * @return a strin
+ */
+  public static String mkClusterPath(String username, String clustername) {
+    return mkSliderUserPath(username) + "/" + clustername;
+  }
+/**
+ * Build the path to a cluster; exists once the cluster has come up.
+ * Even before that, a ZK watcher could wait for it.
+ * @param username user
+ * @return a string
+ */
+  public static String mkSliderUserPath(String username) {
+    return SVC_SLIDER_USERS + "/" + username;
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
new file mode 100644
index 0000000..b088568
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import java.util.Locale;
+
+public final class ZKPathBuilder {
+
+  private final String username, appname, clustername;
+  private final String quorum;
+
+  private String appPath;
+  private String registryPath;
+  private final String appQuorum;
+  
+  public ZKPathBuilder(String username,
+    String appname,
+    String clustername,
+    String quorum,
+      String appQuorum) {
+    this.username = username;
+    this.appname = appname;
+    this.clustername = clustername;
+    this.quorum = quorum;
+    appPath = buildAppPath();
+    registryPath = buildRegistryPath();
+    this.appQuorum = appQuorum;
+  }
+
+  public String buildAppPath() {
+    return String.format(Locale.ENGLISH, "/yarnapps_%s_%s_%s", appname,
+                         username, clustername);
+
+  }
+
+  public String buildRegistryPath() {
+    return String.format(Locale.ENGLISH, "/services_%s_%s_%s", appname,
+                         username, clustername);
+
+  }
+
+  public String getQuorum() {
+    return quorum;
+  }
+
+  public String getAppQuorum() {
+    return appQuorum;
+  }
+
+  public String getAppPath() {
+    return appPath;
+  }
+
+  public void setAppPath(String appPath) {
+    this.appPath = appPath;
+  }
+
+  public String getRegistryPath() {
+    return registryPath;
+  }
+
+  public void setRegistryPath(String registryPath) {
+    this.registryPath = registryPath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
new file mode 100644
index 0000000..cc1b2c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadConfigException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ZookeeperUtils {
+  public static final int DEFAULT_PORT = 2181;
+
+  public static String buildConnectionString(String zkHosts, int port) {
+    String zkPort = Integer.toString(port);
+    //parse the hosts
+    String[] hostlist = zkHosts.split(",", 0);
+    String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",", false);
+    return quorum;
+  }
+
+  /**
+   * Take a quorum list and split it to (trimmed) pairs
+   * @param hostPortQuorumList list of form h1:port, h2:port2,...
+   * @return a possibly empty list of values between commas. They may not be
+   * valid hostname:port pairs
+   */
+  public static List<String> splitToPairs(String hostPortQuorumList) {
+    // split an address hot
+    String[] strings = StringUtils.getStrings(hostPortQuorumList);
+    int len = 0;
+    if (strings != null) {
+      len = strings.length;
+    }
+    List<String> tuples = new ArrayList<String>(len);
+    if (strings != null) {
+      for (String s : strings) {
+        tuples.add(s.trim());
+      }
+    }
+    return tuples;
+  }
+
+  /**
+   * Split a quorum list into a list of hostnames and ports
+   * @param hostPortQuorumList split to a list of hosts and ports
+   * @return a list of values
+   */
+  public static List<HostAndPort> splitToHostsAndPorts(String hostPortQuorumList) {
+    // split an address hot
+    String[] strings = StringUtils.getStrings(hostPortQuorumList);
+    int len = 0;
+    if (strings != null) {
+      len = strings.length;
+    }
+    List<HostAndPort> list = new ArrayList<HostAndPort>(len);
+    if (strings != null) {
+      for (String s : strings) {
+        list.add(HostAndPort.fromString(s.trim()).withDefaultPort(DEFAULT_PORT));
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Build up to a hosts only list
+   * @param hostAndPorts
+   * @return a list of the hosts only
+   */
+  public static String buildHostsOnlyList(List<HostAndPort> hostAndPorts) {
+    StringBuilder sb = new StringBuilder();
+    for (HostAndPort hostAndPort : hostAndPorts) {
+      sb.append(hostAndPort.getHostText()).append(",");
+    }
+    if (sb.length() > 0) {
+      sb.delete(sb.length() - 1, sb.length());
+    }
+    return sb.toString();
+  }
+
+  public static String buildQuorumEntry(HostAndPort hostAndPort,
+    int defaultPort) {
+    String s = hostAndPort.toString();
+    if (hostAndPort.hasPort()) {
+      return s;
+    } else {
+      return s + ":" + defaultPort;
+    }
+  }
+
+  /**
+   * Build a quorum list, injecting a ":defaultPort" ref if needed on
+   * any entry without one
+   * @param hostAndPorts
+   * @param defaultPort
+   * @return
+   */
+  public static String buildQuorum(List<HostAndPort> hostAndPorts, int defaultPort) {
+    List<String> entries = new ArrayList<String>(hostAndPorts.size());
+    for (HostAndPort hostAndPort : hostAndPorts) {
+      entries.add(buildQuorumEntry(hostAndPort, defaultPort));
+    }
+    return SliderUtils.join(entries, ",", false);
+  }
+  
+  public static String convertToHostsOnlyList(String quorum) throws
+      BadConfigException {
+    List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
+    return ZookeeperUtils.buildHostsOnlyList(hostAndPorts);
+  }
+
+  public static List<HostAndPort> splitToHostsAndPortsStrictly(String quorum) throws
+      BadConfigException {
+    List<HostAndPort> hostAndPorts =
+        ZookeeperUtils.splitToHostsAndPorts(quorum);
+    if (hostAndPorts.isEmpty()) {
+      throw new BadConfigException("empty zookeeper quorum");
+    }
+    return hostAndPorts;
+  }
+  
+  public static int getFirstPort(String quorum, int defVal) throws
+      BadConfigException {
+    List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
+    int port = hostAndPorts.get(0).getPortOrDefault(defVal);
+    return port;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
new file mode 100644
index 0000000..510de5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.launch.AbstractLauncher;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.slider.api.ResourceKeys.COMPONENT_INSTANCES;
+import static org.apache.slider.api.ResourceKeys.DEF_YARN_CORES;
+import static org.apache.slider.api.ResourceKeys.DEF_YARN_MEMORY;
+import static org.apache.slider.api.ResourceKeys.YARN_CORES;
+import static org.apache.slider.api.ResourceKeys.YARN_MEMORY;
+
+public abstract class AbstractClientProvider extends Configured {
+  private static final Logger log =
+    LoggerFactory.getLogger(AbstractClientProvider.class);
+  protected static final ProviderUtils providerUtils =
+    new ProviderUtils(log);
+
+  public static final String PROVIDER_RESOURCE_BASE =
+    "org/apache/slider/providers/";
+  public static final String PROVIDER_RESOURCE_BASE_ROOT =
+    "/" + PROVIDER_RESOURCE_BASE;
+
+  public AbstractClientProvider(Configuration conf) {
+    super(conf);
+  }
+
+  public abstract String getName();
+
+  public abstract List<ProviderRole> getRoles();
+
+  /**
+   * Verify that an instance definition is considered valid by the provider
+   * @param instanceDefinition instance definition
+   * @throws SliderException if the configuration is not valid
+   */
+  public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws
+      SliderException {
+
+    List<ProviderRole> roles = getRoles();
+    ConfTreeOperations resources =
+      instanceDefinition.getResourceOperations();
+    for (ProviderRole role : roles) {
+      String name = role.name;
+      MapOperations component = resources.getComponent(role.group);
+      if (component != null) {
+        String instances = component.get(COMPONENT_INSTANCES);
+        if (instances == null) {
+          String message = "No instance count provided for " + name;
+          log.error("{} with \n{}", message, resources.toString());
+          throw new BadClusterStateException(message);
+        }
+        String ram = component.get(YARN_MEMORY);
+        String cores = component.get(YARN_CORES);
+
+
+        providerUtils.getRoleResourceRequirement(ram,
+                                                 DEF_YARN_MEMORY,
+                                                 Integer.MAX_VALUE);
+        providerUtils.getRoleResourceRequirement(cores,
+                                                 DEF_YARN_CORES,
+                                                 Integer.MAX_VALUE);
+      }
+    }
+  }
+
+
+  /**
+   * Any provider-side alteration of a configuration can take place here.
+   * @param aggregateConf config to patch
+   * @throws IOException IO problems
+   * @throws SliderException Slider-specific issues
+   */
+  public void prepareInstanceConfiguration(AggregateConf aggregateConf) throws
+      SliderException,
+                                                                    IOException {
+    //default: do nothing
+  }
+
+
+  /**
+   * Prepare the AM settings for launch
+   * @param fileSystem filesystem
+   * @param serviceConf configuration of the client
+   * @param launcher launcher to set up
+   * @param instanceDescription instance description being launched
+   * @param snapshotConfDirPath
+   * @param generatedConfDirPath
+   * @param clientConfExtras
+   * @param libdir
+   * @param tempPath
+   * @param miniClusterTestRun flag set to true on a mini cluster run
+   * @throws IOException
+   * @throws SliderException
+   */
+  public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem,
+      Configuration serviceConf,
+      AbstractLauncher launcher,
+      AggregateConf instanceDescription,
+      Path snapshotConfDirPath,
+      Path generatedConfDirPath,
+      Configuration clientConfExtras,
+      String libdir,
+      Path tempPath,
+      boolean miniClusterTestRun)
+    throws IOException, SliderException {
+    
+  }
+  
+  /**
+   * Load in and merge in templates. Null arguments means "no such template"
+   * @param instanceConf instance to patch 
+   * @param internalTemplate patch to internal.json
+   * @param resourceTemplate path to resources.json
+   * @param appConfTemplate path to app_conf.json
+   * @throws IOException any IO problems
+   */
+  protected void mergeTemplates(AggregateConf instanceConf,
+                                String internalTemplate,
+                                String resourceTemplate,
+                                String appConfTemplate) throws IOException {
+    if (internalTemplate != null) {
+      ConfTreeOperations template =
+        ConfTreeOperations.fromResource(internalTemplate);
+      instanceConf.getInternalOperations()
+                  .mergeWithoutOverwrite(template.confTree);
+    }
+
+    if (resourceTemplate != null) {
+      ConfTreeOperations resTemplate =
+        ConfTreeOperations.fromResource(resourceTemplate);
+      instanceConf.getResourceOperations()
+                   .mergeWithoutOverwrite(resTemplate.confTree);
+    }
+   
+    if (appConfTemplate != null) {
+      ConfTreeOperations template =
+        ConfTreeOperations.fromResource(appConfTemplate);
+      instanceConf.getAppConfOperations()
+                   .mergeWithoutOverwrite(template.confTree);
+    }
+    
+  }
+
+  /**
+   * This is called pre-launch to validate that the cluster specification
+   * is valid. This can include checking that the security options
+   * are in the site files prior to launch, that there are no conflicting operations
+   * etc.
+   *
+   * This check is made prior to every launch of the cluster -so can 
+   * pick up problems which manually edited cluster files have added,
+   * or from specification files from previous versions.
+   *
+   * The provider MUST NOT change the remote specification. This is
+   * purely a pre-launch validation of options.
+   *
+   *
+   * @param sliderFileSystem filesystem
+   * @param clustername name of the cluster
+   * @param configuration cluster configuration
+   * @param instanceDefinition cluster specification
+   * @param clusterDirPath directory of the cluster
+   * @param generatedConfDirPath path to place generated artifacts
+   * @param secure flag to indicate that the cluster is secure
+   * @throws SliderException on any validation issue
+   * @throws IOException on any IO problem
+   */
+  public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem,
+                                                      String clustername,
+                                                      Configuration configuration,
+                                                      AggregateConf instanceDefinition,
+                                                      Path clusterDirPath,
+                                                      Path generatedConfDirPath,
+                                                      boolean secure)
+      throws SliderException, IOException {
+    validateInstanceDefinition(instanceDefinition, sliderFileSystem);
+  }
+
+  /**
+   * Return a set of application specific string tags.
+   * @return the set of tags.
+   */
+  public Set<String> getApplicationTags (SliderFileSystem fileSystem,
+                                         String appDef) throws SliderException {
+    return Collections.emptySet();
+  }
+
+  /**
+   * Process client operations for applications such as install, configure
+   * @param fileSystem
+   * @param registryOperations
+   * @param configuration
+   * @param operation
+   * @param clientInstallPath
+   * @param clientPackage
+   * @param clientConfig
+   * @param name
+   * @throws SliderException
+   */
+  public void processClientOperation(SliderFileSystem fileSystem,
+                                     RegistryOperations registryOperations,
+                                     Configuration configuration,
+                                     String operation,
+                                     File clientInstallPath,
+                                     File clientPackage,
+                                     JSONObject clientConfig,
+                                     String name)
+      throws SliderException {
+    throw new SliderException("Provider does not support client operations.");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
new file mode 100644
index 0000000..61b2655
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.main.ExitCodeProvider;
+import org.apache.slider.server.appmaster.actions.QueueAccess;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.state.ContainerReleaseSelector;
+import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
+import org.apache.slider.server.services.workflow.ForkedProcessService;
+import org.apache.slider.server.services.workflow.ServiceParent;
+import org.apache.slider.server.services.workflow.WorkflowSequenceService;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * The base class for provider services. It lets the implementations
+ * add sequences of operations, and propagates service failures
+ * upstream
+ */
+public abstract class AbstractProviderService
+    extends WorkflowSequenceService
+    implements
+    ProviderCore,
+    SliderKeys,
+    ProviderService {
+  private static final Logger log =
+    LoggerFactory.getLogger(AbstractProviderService.class);
+  protected StateAccessForProviders amState;
+  protected AgentRestOperations restOps;
+  protected URL amWebAPI;
+  protected YarnRegistryViewForProviders yarnRegistry;
+  protected QueueAccess queueAccess;
+
+  protected AbstractProviderService(String name) {
+    super(name);
+    setStopIfNoChildServicesAtStartup(false);
+  }
+
+  @Override
+  public Configuration getConf() {
+    return getConfig();
+  }
+
+  public StateAccessForProviders getAmState() {
+    return amState;
+  }
+
+  public QueueAccess getQueueAccess() {
+    return queueAccess;
+  }
+
+  public void setAmState(StateAccessForProviders amState) {
+    this.amState = amState;
+  }
+
+  @Override
+  public String getHumanName() {
+    return getName().toLowerCase(Locale.ENGLISH);
+  }
+  
+  @Override
+  public void bind(StateAccessForProviders stateAccessor,
+      QueueAccess queueAccess,
+      List<Container> liveContainers) {
+    this.amState = stateAccessor;
+    this.queueAccess = queueAccess;
+  }
+
+  @Override
+  public void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry) {
+    this.yarnRegistry = yarnRegistry;
+  }
+
+  public YarnRegistryViewForProviders getYarnRegistry() {
+    return yarnRegistry;
+  }
+
+  @Override
+  public AgentRestOperations getAgentRestOperations() {
+    return restOps;
+  }
+
+  @Override
+  public void notifyContainerCompleted(ContainerId containerId) {
+  }
+
+  public void setAgentRestOperations(AgentRestOperations agentRestOperations) {
+    this.restOps = agentRestOperations;
+  }
+
+  /**
+   * Load a specific XML configuration file for the provider config
+   * @param confDir configuration directory
+   * @param siteXMLFilename provider-specific filename
+   * @return a configuration to be included in status
+   * @throws BadCommandArgumentsException argument problems
+   * @throws IOException IO problems
+   */
+  protected Configuration loadProviderConfigurationInformation(File confDir,
+                                                               String siteXMLFilename)
+    throws BadCommandArgumentsException, IOException {
+    Configuration siteConf;
+    File siteXML = new File(confDir, siteXMLFilename);
+    if (!siteXML.exists()) {
+      throw new BadCommandArgumentsException(
+        "Configuration directory %s doesn't contain %s - listing is %s",
+        confDir, siteXMLFilename, SliderUtils.listDir(confDir));
+    }
+
+    //now read it in
+    siteConf = ConfigHelper.loadConfFromFile(siteXML);
+    log.info("{} file is at {}", siteXMLFilename, siteXML);
+    log.info(ConfigHelper.dumpConfigToString(siteConf));
+    return siteConf;
+  }
+
+  /**
+   * No-op implementation of this method.
+   */
+  @Override
+  public void initializeApplicationConfiguration(
+      AggregateConf instanceDefinition, SliderFileSystem fileSystem)
+      throws IOException, SliderException {
+  }
+
+  /**
+   * No-op implementation of this method.
+   *
+   * {@inheritDoc}
+   */
+  @Override
+  public void validateApplicationConfiguration(AggregateConf instance,
+                                               File confDir,
+                                               boolean secure)
+      throws IOException, SliderException {
+
+  }
+
+  /**
+   * Scan through the roles and see if it is supported.
+   * @param role role to look for
+   * @return true if the role is known about -and therefore
+   * that a launcher thread can be deployed to launch it
+   */
+  @Override
+  public boolean isSupportedRole(String role) {
+    Collection<ProviderRole> roles = getRoles();
+    for (ProviderRole providedRole : roles) {
+      if (providedRole.name.equals(role)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * override point to allow a process to start executing in this container
+   * @param instanceDefinition cluster description
+   * @param confDir configuration directory
+   * @param env environment
+   * @param execInProgress the callback for the exec events
+   * @return false
+   * @throws IOException
+   * @throws SliderException
+   */
+  @Override
+  public boolean exec(AggregateConf instanceDefinition,
+      File confDir,
+      Map<String, String> env,
+      ProviderCompleted execInProgress) throws IOException, SliderException {
+    return false;
+  }
+
+  @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+  @Override // ExitCodeProvider
+  public int getExitCode() {
+    Throwable cause = getFailureCause();
+    if (cause != null) {
+      //failed for some reason
+      if (cause instanceof ExitCodeProvider) {
+        return ((ExitCodeProvider) cause).getExitCode();
+      }
+    }
+    ForkedProcessService lastProc = latestProcess();
+    if (lastProc == null || !lastProc.isProcessTerminated()) {
+      return 0;
+    } else {
+      return lastProc.getExitCode();
+    }
+  }
+
+  /**
+   * Return the latest forked process service that ran
+   * @return the forkes service
+   */
+  protected ForkedProcessService latestProcess() {
+    Service current = getActiveService();
+    Service prev = getPreviousService();
+
+    Service latest = current != null ? current : prev;
+    if (latest instanceof ForkedProcessService) {
+      return (ForkedProcessService) latest;
+    } else {
+      //its a composite object, so look inside it for a process
+      if (latest instanceof ServiceParent) {
+        return getFPSFromParentService((ServiceParent) latest);
+      } else {
+        //no match
+        return null;
+      }
+    }
+  }
+
+
+  /**
+   * Given a parent service, find the one that is a forked process
+   * @param serviceParent parent
+   * @return the forked process service or null if there is none
+   */
+  protected ForkedProcessService getFPSFromParentService(ServiceParent serviceParent) {
+    List<Service> services = serviceParent.getServices();
+    for (Service s : services) {
+      if (s instanceof ForkedProcessService) {
+        return (ForkedProcessService) s;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * if we are already running, start this service
+   */
+  protected void maybeStartCommandSequence() {
+    if (isInState(STATE.STARTED)) {
+      startNextService();
+    }
+  }
+
+  /**
+   * Create a new forked process service with the given
+   * name, environment and command list -then add it as a child
+   * for execution in the sequence.
+   *
+   * @param name command name
+   * @param env environment
+   * @param commands command line
+   * @throws IOException
+   * @throws SliderException
+   */
+  protected ForkedProcessService queueCommand(String name,
+                              Map<String, String> env,
+                              List<String> commands) throws
+                                                     IOException,
+      SliderException {
+    ForkedProcessService process = buildProcess(name, env, commands);
+    //register the service for lifecycle management; when this service
+    //is terminated, so is the master process
+    addService(process);
+    return process;
+  }
+
+  public ForkedProcessService buildProcess(String name,
+                                           Map<String, String> env,
+                                           List<String> commands) throws
+                                                                  IOException,
+      SliderException {
+    ForkedProcessService process;
+    process = new ForkedProcessService(name);
+    process.init(getConfig());
+    process.build(env, commands);
+    return process;
+  }
+
+  /*
+   * Build the provider status, can be empty
+   * @return the provider status - map of entries to add to the info section
+   */
+  @Override
+  public Map<String, String> buildProviderStatus() {
+    return new HashMap<String, String>();
+  }
+
+  /*
+  Build the monitor details. The base implementation includes all the external URL endpoints
+  in the external view
+   */
+  @Override
+  public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
+    Map<String, MonitorDetail> details = new LinkedHashMap<String, MonitorDetail>();
+
+    // add in all the endpoints
+    buildEndpointDetails(details);
+
+    return details;
+  }
+
+  @Override
+  public void buildEndpointDetails(Map<String, MonitorDetail> details) {
+    ServiceRecord self = yarnRegistry.getSelfRegistration();
+
+    List<Endpoint> externals = self.external;
+    for (Endpoint endpoint : externals) {
+      String addressType = endpoint.addressType;
+      if (AddressTypes.ADDRESS_URI.equals(addressType)) {
+        try {
+          List<URL> urls = RegistryTypeUtils.retrieveAddressURLs(endpoint);
+          if (!urls.isEmpty()) {
+            details.put(endpoint.api, new MonitorDetail(urls.get(0).toString(), true));
+          }
+        } catch (InvalidRecordException  | MalformedURLException ignored) {
+          // Ignored
+        }
+
+      }
+
+    }
+  }
+
+  @Override
+  public void applyInitialRegistryDefinitions(URL amWebURI,
+      URL agentOpsURI,
+      URL agentStatusURI,
+      ServiceRecord serviceRecord)
+    throws IOException {
+      this.amWebAPI = amWebURI;
+  }
+
+  /**
+   * {@inheritDoc}
+   * 
+   * 
+   * @return The base implementation returns the most recent containers first.
+   */
+  @Override
+  public ContainerReleaseSelector createContainerReleaseSelector() {
+    return new MostRecentContainerReleaseSelector();
+  }
+
+  @Override
+  public void releaseAssignedContainer(ContainerId containerId) {
+    // no-op
+  }
+
+  @Override
+  public void addContainerRequest(AMRMClient.ContainerRequest req) {
+    // no-op
+  }
+
+  @Override
+  public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
+    // no-op
+  }
+
+  @Override
+  public int cancelContainerRequests(Priority priority1,
+      Priority priority2,
+      int count) {
+    return 0;
+  }
+
+  @Override
+  public void execute(List<AbstractRMOperation> operations) {
+    for (AbstractRMOperation operation : operations) {
+      operation.execute(this);
+    }
+  }
+  /**
+   * No-op implementation of this method.
+   */
+  @Override
+  public void rebuildContainerDetails(List<Container> liveContainers,
+      String applicationId, Map<Integer, ProviderRole> providerRoles) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
new file mode 100644
index 0000000..27d3415
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers;
+
+/**
+ * Details about some exported information from a provider to the AM web UI.
+ */
+public class MonitorDetail {
+
+  private final String value;
+  private final boolean isUrl;
+
+  public MonitorDetail(String value, boolean isUrl) {
+    this.value = value;
+    this.isUrl = isUrl;
+  }
+
+  public String getValue() {
+    return value;
+  }
+
+  public boolean isUrl() {
+    return isUrl;
+  }
+
+  public String toString() {
+    return "MonitorDetail[" + value + " isUrl=" + isUrl + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
new file mode 100644
index 0000000..128dd5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+/**
+ * Placement values.
+ * This is nominally a bitmask, though not all values make sense
+ */
+public class PlacementPolicy {
+
+  /**
+   * Default value: history used, anti-affinity hinted at on rebuild/flex up
+   */
+  public static final int NONE = 0;
+
+  /**
+   * Default value: history used, anti-affinity hinted at on rebuild/flex up
+   */
+  public static final int DEFAULT = NONE;
+
+  /**
+   * Strict placement: when asking for an instance for which there is
+   * history, mandate that it is strict
+   */
+  public static final int STRICT = 1;
+
+  /**
+   * No data locality; do not use placement history
+   */
+  public static final int ANYWHERE = 2;
+
+  /**
+   * @Deprecated: use {@link #ANYWHERE}
+   */
+  @Deprecated
+  public static final int NO_DATA_LOCALITY = ANYWHERE;
+
+  /**
+   * Anti-affinity is mandatory.
+   */
+  public static final int ANTI_AFFINITY_REQUIRED = 4;
+  
+  /**
+   * Exclude from flexing; used internally to mark AMs.
+   */
+  public static final int EXCLUDE_FROM_FLEXING = 16;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java
new file mode 100644
index 0000000..e61f944
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+public enum PlacementPolicyOptions {
+
+  EXCLUDE_FROM_FLEXING,
+  NO_DATA_LOCALITY,
+  ANTI_AFFINITY_REQUIRED,
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
new file mode 100644
index 0000000..f6ff4fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+/**
+ * This is the callback triggered by the {@link ProviderCompletedCallable}
+ * when it generates a notification
+ */
+public interface ProviderCompleted {
+  
+  public void eventCallbackEvent(Object parameter);
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
new file mode 100644
index 0000000..47939c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import java.util.concurrent.Callable;
+
+public class ProviderCompletedCallable implements Callable<Object> {
+
+  private final ProviderCompleted callback;
+  private final Object parameter;
+
+  public ProviderCompletedCallable(ProviderCompleted callback, Object parameter) {
+    this.callback = callback;
+    this.parameter = parameter;
+  }
+
+  @Override
+  public Object call() throws Exception {
+    callback.eventCallbackEvent(parameter);
+    return parameter;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org