You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/13 22:53:13 UTC
[21/74] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
new file mode 100644
index 0000000..bf71861
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.restclient;
+
+import com.google.common.base.Preconditions;
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.slider.core.exceptions.ExceptionConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Class to bond to a Jersey client, for UGI integration and SPNEGO.
+ * <p>
+ * Usage: create an instance, then when creating a Jersey <code>Client</code>
+ * pass in to the constructor the handler provided by {@link #getHandler()}
+ *
+ * see <a href="https://jersey.java.net/apidocs/1.17/jersey/com/sun/jersey/client/urlconnection/HttpURLConnectionFactory.html">Jersey docs</a>
+ */
+public class UgiJerseyBinding implements
+ HttpURLConnectionFactory {
+ private static final Logger log =
+ LoggerFactory.getLogger(UgiJerseyBinding.class);
+
+ private final UrlConnectionOperations operations;
+ private final URLConnectionClientHandler handler;
+
+ /**
+ * Construct an instance
+ * @param operations operations instance
+ */
+ @SuppressWarnings("ThisEscapedInObjectConstruction")
+ public UgiJerseyBinding(UrlConnectionOperations operations) {
+ Preconditions.checkArgument(operations != null, "Null operations");
+ this.operations = operations;
+ handler = new URLConnectionClientHandler(this);
+ }
+
+ /**
+ * Create an instance off the configuration. The SPNEGO policy
+ * is derived from the current UGI settings.
+ * @param conf config
+ */
+ public UgiJerseyBinding(Configuration conf) {
+ this(new UrlConnectionOperations(conf));
+ }
+
+ /**
+ * Get a URL connection.
+ * @param url URL to connect to
+ * @return the connection
+ * @throws IOException any problem. {@link AuthenticationException}
+ * errors are wrapped
+ */
+ @Override
+ public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
+ try {
+ // open a connection handling status codes and so redirections
+ // but as it opens a connection, it's less useful than you think.
+
+ return operations.openConnection(url);
+ } catch (AuthenticationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ public UrlConnectionOperations getOperations() {
+ return operations;
+ }
+
+ public URLConnectionClientHandler getHandler() {
+ return handler;
+ }
+
+ /**
+ * Get the SPNEGO flag (as found in the operations instance
+ * @return the spnego policy
+ */
+ public boolean isUseSpnego() {
+ return operations.isUseSpnego();
+ }
+
+
+ /**
+ * Uprate error codes 400 and up into faults;
+ * <p>
+ * see {@link ExceptionConverter#convertJerseyException(String, String, UniformInterfaceException)}
+ */
+ public static IOException uprateFaults(HttpVerb verb, String url,
+ UniformInterfaceException ex)
+ throws IOException {
+ return ExceptionConverter.convertJerseyException(verb.getVerb(),
+ url, ex);
+ }
+
+ /**
+ * Create the standard Jersey client Config
+ * @return the recommended Jersey Client config
+ */
+ public ClientConfig createJerseyClientConfig() {
+ ClientConfig clientConfig = new DefaultClientConfig();
+ clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, true);
+ return clientConfig;
+ }
+
+ /**
+ * Create a jersey client bonded to this handler, using the
+ * supplied client config
+ * @param clientConfig client configuratin
+ * @return a new client instance to use
+ */
+ public Client createJerseyClient(ClientConfig clientConfig) {
+ return new Client(getHandler(), clientConfig);
+ }
+
+ /**
+ * Create a jersey client bonded to this handler, using the
+ * client config created with {@link #createJerseyClientConfig()}
+ * @return a new client instance to use
+ */
+ public Client createJerseyClient() {
+ return createJerseyClient(createJerseyClientConfig());
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
new file mode 100644
index 0000000..20ef198
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.restclient;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+/**
+ * Operations on the JDK UrlConnection class.
+ *
+ */
+public class UrlConnectionOperations extends Configured {
+ private static final Logger log =
+ LoggerFactory.getLogger(UrlConnectionOperations.class);
+
+ private SliderURLConnectionFactory connectionFactory;
+
+ private boolean useSpnego = false;
+
+ /**
+ * Create an instance off the configuration. The SPNEGO policy
+ * is derived from the current UGI settings.
+ * @param conf config
+ */
+ public UrlConnectionOperations(Configuration conf) {
+ super(conf);
+ connectionFactory = SliderURLConnectionFactory.newInstance(conf);
+ if (UserGroupInformation.isSecurityEnabled()) {
+ log.debug("SPNEGO is enabled");
+ setUseSpnego(true);
+ }
+ }
+
+
+ public boolean isUseSpnego() {
+ return useSpnego;
+ }
+
+ public void setUseSpnego(boolean useSpnego) {
+ this.useSpnego = useSpnego;
+ }
+
+ /**
+ * Opens a url with cache disabled, redirect handled in
+ * (JDK) implementation.
+ *
+ * @param url to open
+ * @return URLConnection
+ * @throws IOException
+ * @throws AuthenticationException authentication failure
+ */
+ public HttpURLConnection openConnection(URL url) throws
+ IOException,
+ AuthenticationException {
+ Preconditions.checkArgument(url.getPort() != 0, "no port");
+ return (HttpURLConnection) connectionFactory.openConnection(url, useSpnego);
+ }
+
+ public HttpOperationResponse execGet(URL url) throws
+ IOException,
+ AuthenticationException {
+ return execHttpOperation(HttpVerb.GET, url, null, "");
+ }
+
+ public HttpOperationResponse execHttpOperation(HttpVerb verb,
+ URL url,
+ byte[] payload,
+ String contentType)
+ throws IOException, AuthenticationException {
+ HttpURLConnection conn = null;
+ HttpOperationResponse outcome = new HttpOperationResponse();
+ int resultCode;
+ byte[] body = null;
+ log.debug("{} {} spnego={}", verb, url, useSpnego);
+
+ boolean doOutput = verb.hasUploadBody();
+ if (doOutput) {
+ Preconditions.checkArgument(payload !=null,
+ "Null payload on a verb which expects one");
+ }
+ try {
+ conn = openConnection(url);
+ conn.setRequestMethod(verb.getVerb());
+ conn.setDoOutput(doOutput);
+ if (doOutput) {
+ conn.setRequestProperty("Content-Type", contentType);
+ }
+
+ // now do the connection
+ conn.connect();
+
+ if (doOutput) {
+ OutputStream output = conn.getOutputStream();
+ IOUtils.write(payload, output);
+ output.close();
+ }
+
+ resultCode = conn.getResponseCode();
+ outcome.lastModified = conn.getLastModified();
+ outcome.contentType = conn.getContentType();
+ outcome.headers = conn.getHeaderFields();
+ InputStream stream = conn.getErrorStream();
+ if (stream == null) {
+ stream = conn.getInputStream();
+ }
+ if (stream != null) {
+ // read into a buffer.
+ body = IOUtils.toByteArray(stream);
+ } else {
+ // no body:
+ log.debug("No body in response");
+
+ }
+ } catch (SSLException e) {
+ throw e;
+ } catch (IOException e) {
+ throw NetUtils.wrapException(url.toString(),
+ url.getPort(), "localhost", 0, e);
+
+ } catch (AuthenticationException e) {
+ throw new AuthenticationException("From " + url + ": " + e, e);
+
+ } finally {
+ if (conn != null) {
+ conn.disconnect();
+ }
+ }
+ uprateFaults(HttpVerb.GET, url.toString(), resultCode, "", body);
+ outcome.responseCode = resultCode;
+ outcome.data = body;
+ return outcome;
+ }
+
+ /**
+ * Uprate error codes 400 and up into faults;
+ * 404 is converted to a {@link NotFoundException},
+ * 401 to {@link ForbiddenException}
+ *
+ * @param verb HTTP Verb used
+ * @param url URL as string
+ * @param resultCode response from the request
+ * @param bodyAsString
+ *@param body optional body of the request @throws IOException if the result was considered a failure
+ */
+ public static void uprateFaults(HttpVerb verb, String url,
+ int resultCode, String bodyAsString, byte[] body)
+ throws IOException {
+
+ if (resultCode < 400) {
+ //success
+ return;
+ }
+ String msg = verb.toString() +" "+ url;
+ if (resultCode == 404) {
+ throw new NotFoundException(msg);
+ }
+ if (resultCode == 401) {
+ throw new ForbiddenException(msg);
+ }
+ // all other error codes
+
+ // get a string respnse
+ if (bodyAsString == null) {
+ if (body != null && body.length > 0) {
+ bodyAsString = new String(body);
+ } else {
+ bodyAsString = "";
+ }
+ }
+ String message = msg +
+ " failed with exit code " + resultCode
+ + ", body length " + bodyAsString.length()
+ + ":\n" + bodyAsString;
+ log.error(message);
+ throw new IOException(message);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
new file mode 100644
index 0000000..ca49888
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BlockingZKWatcher implements Watcher {
+
+ protected static final Logger log =
+ LoggerFactory.getLogger(BlockingZKWatcher.class);
+ private final AtomicBoolean connectedFlag = new AtomicBoolean(false);
+
+ @Override
+ public void process(WatchedEvent event) {
+ log.info("ZK binding callback received");
+ connectedFlag.set(true);
+ synchronized (connectedFlag) {
+ try {
+ connectedFlag.notify();
+ } catch (Exception e) {
+ log.warn("failed while waiting for notification", e);
+ }
+ }
+ }
+
+ /**
+ * Wait for a flag to go true
+ * @param timeout timeout in millis
+ */
+
+ public void waitForZKConnection(int timeout)
+ throws InterruptedException, ConnectException {
+ synchronized (connectedFlag) {
+ if (!connectedFlag.get()) {
+ log.info("waiting for ZK event");
+ //wait a bit
+ connectedFlag.wait(timeout);
+ }
+ }
+ if (!connectedFlag.get()) {
+ throw new ConnectException("Unable to connect to ZK quorum");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
new file mode 100644
index 0000000..c8b3adb
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.Reader;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+
+/**
+ * This is a version of the HBase ZK cluster cut out to be standalone.
+ *
+ * <i>Important: keep this Java6 language level for now</i>
+ */
+public class MiniZooKeeperCluster extends AbstractService {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ MiniZooKeeperCluster.class);
+
+ private static final int TICK_TIME = 2000;
+ private static final int CONNECTION_TIMEOUT = 30000;
+ public static final int MAX_CLIENT_CONNECTIONS = 1000;
+
+ private boolean started;
+
+ /** The default port. If zero, we use a random port. */
+ private int defaultClientPort = 0;
+
+ private int clientPort;
+
+ private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
+ private final List<ZooKeeperServer> zooKeeperServers;
+ private final List<Integer> clientPortList;
+
+ private int activeZKServerIndex;
+ private int tickTime = 0;
+ private File baseDir;
+ private final int numZooKeeperServers;
+ private String zkQuorum = "";
+
+ public MiniZooKeeperCluster(int numZooKeeperServers) {
+ super("MiniZooKeeperCluster");
+ this.numZooKeeperServers = numZooKeeperServers;
+ this.started = false;
+ activeZKServerIndex = -1;
+ zooKeeperServers = new ArrayList<ZooKeeperServer>();
+ clientPortList = new ArrayList<Integer>();
+ standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
+ }
+
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ public void setDefaultClientPort(int clientPort) {
+ if (clientPort <= 0) {
+ throw new IllegalArgumentException("Invalid default ZK client port: "
+ + clientPort);
+ }
+ this.defaultClientPort = clientPort;
+ }
+
+ /**
+ * Selects a ZK client port. Returns the default port if specified.
+ * Otherwise, returns a random port. The random port is selected from the
+ * range between 49152 to 65535. These ports cannot be registered with IANA
+ * and are intended for dynamic allocation (see http://bit.ly/dynports).
+ */
+ private int selectClientPort(Random r) {
+ if (defaultClientPort > 0) {
+ return defaultClientPort;
+ }
+ return 0xc000 + r.nextInt(0x3f00);
+ }
+
+ public void setTickTime(int tickTime) {
+ this.tickTime = tickTime;
+ }
+
+ public int getBackupZooKeeperServerNum() {
+ return zooKeeperServers.size() - 1;
+ }
+
+ public int getZooKeeperServerNum() {
+ return zooKeeperServers.size();
+ }
+
+ // / XXX: From o.a.zk.t.ClientBase
+ private static void setupTestEnv() {
+ // during the tests we run with 100K prealloc in the logs.
+ // on windows systems prealloc of 64M was seen to take ~15seconds
+ // resulting in test failure (client timeout on first session).
+ // set env and directly in order to handle static init/gc issues
+ System.setProperty("zookeeper.preAllocSize", "100");
+ FileTxnLog.setPreallocSize(100 * 1024);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ startup();
+ }
+
+ /**
+ * @param baseDir
+ * @param numZooKeeperServers
+ * @return ClientPort server bound to, -1 if there was a
+ * binding problem and we couldn't pick another port.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private int startup() throws IOException,
+ InterruptedException {
+ if (numZooKeeperServers <= 0)
+ return -1;
+
+ setupTestEnv();
+ started = true;
+ baseDir = File.createTempFile("zookeeper", ".dir");
+ recreateDir(baseDir);
+
+ StringBuilder quorumList = new StringBuilder();
+ Random rnd = new Random();
+ int tentativePort = selectClientPort(rnd);
+
+ // running all the ZK servers
+ for (int i = 0; i < numZooKeeperServers; i++) {
+ File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
+ recreateDir(dir);
+ int tickTimeToUse;
+ if (this.tickTime > 0) {
+ tickTimeToUse = this.tickTime;
+ } else {
+ tickTimeToUse = TICK_TIME;
+ }
+ ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
+ NIOServerCnxnFactory standaloneServerFactory;
+ while (true) {
+ try {
+ standaloneServerFactory = new NIOServerCnxnFactory();
+ standaloneServerFactory.configure(
+ new InetSocketAddress(tentativePort),
+ MAX_CLIENT_CONNECTIONS
+ );
+ } catch (BindException e) {
+ LOG.debug("Failed binding ZK Server to client port: " +
+ tentativePort, e);
+ // We're told to use some port but it's occupied, fail
+ if (defaultClientPort > 0) return -1;
+ // This port is already in use, try to use another.
+ tentativePort = selectClientPort(rnd);
+ continue;
+ }
+ break;
+ }
+
+ // Start up this ZK server
+ standaloneServerFactory.startup(server);
+ if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for startup of standalone server");
+ }
+
+ // We have selected this port as a client port.
+ clientPortList.add(tentativePort);
+ standaloneServerFactoryList.add(standaloneServerFactory);
+ zooKeeperServers.add(server);
+ if (quorumList.length() > 0) {
+ quorumList.append(",");
+ }
+ quorumList.append("localhost:").append(tentativePort);
+ tentativePort++; //for the next server
+ }
+
+ // set the first one to be active ZK; Others are backups
+ activeZKServerIndex = 0;
+
+ clientPort = clientPortList.get(activeZKServerIndex);
+ zkQuorum = quorumList.toString();
+ LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
+ "on client port: " + clientPort);
+ return clientPort;
+ }
+
+ private void recreateDir(File dir) throws IOException {
+ if (dir.exists()) {
+ if (!FileUtil.fullyDelete(dir)) {
+ throw new IOException("Could not delete zk base directory: " + dir);
+ }
+ }
+ try {
+ dir.mkdirs();
+ } catch (SecurityException e) {
+ throw new IOException("creating dir: " + dir, e);
+ }
+ }
+
+ /**
+ * Delete the basedir
+ */
+ private void deleteBaseDir() {
+ if (baseDir != null) {
+ baseDir.delete();
+ baseDir = null;
+ }
+
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+
+ if (!started) {
+ return;
+ }
+ started = false;
+
+ try {
+ // shut down all the zk servers
+ for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(i);
+ int clientPort = clientPortList.get(i);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+ }
+ for (ZooKeeperServer zkServer : zooKeeperServers) {
+ //explicitly close ZKDatabase since ZookeeperServer does not close them
+ zkServer.getZKDatabase().close();
+ }
+ } finally {
+ // clear everything
+ activeZKServerIndex = 0;
+ standaloneServerFactoryList.clear();
+ clientPortList.clear();
+ zooKeeperServers.clear();
+ }
+
+ LOG.info("Shutdown MiniZK cluster with all ZK servers");
+ }
+
+ /**@return clientPort return clientPort if there is another ZK backup can run
+ * when killing the current active; return -1, if there is no backups.
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public int killCurrentActiveZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0) {
+ return -1;
+ }
+
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(activeZKServerIndex);
+ int clientPort = clientPortList.get(activeZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
+
+ // remove the current active zk server
+ standaloneServerFactoryList.remove(activeZKServerIndex);
+ clientPortList.remove(activeZKServerIndex);
+ zooKeeperServers.remove(activeZKServerIndex);
+ LOG.info("Kill the current active ZK servers in the cluster " +
+ "on client port: " + clientPort);
+
+ if (standaloneServerFactoryList.size() == 0) {
+ // there is no backup servers;
+ return -1;
+ }
+ clientPort = clientPortList.get(activeZKServerIndex);
+ LOG.info("Activate a backup zk server in the cluster " +
+ "on client port: " + clientPort);
+ // return the next back zk server's port
+ return clientPort;
+ }
+
+ /**
+ * Kill one back up ZK servers
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public void killOneBackupZooKeeperServer() throws IOException,
+ InterruptedException {
+ if (!started || activeZKServerIndex < 0 ||
+ standaloneServerFactoryList.size() <= 1) {
+ return;
+ }
+
+ int backupZKServerIndex = activeZKServerIndex + 1;
+ // Shutdown the current active one
+ NIOServerCnxnFactory standaloneServerFactory =
+ standaloneServerFactoryList.get(backupZKServerIndex);
+ int clientPort = clientPortList.get(backupZKServerIndex);
+
+ standaloneServerFactory.shutdown();
+ if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
+ throw new IOException("Waiting for shutdown of standalone server");
+ }
+
+ zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
+
+ // remove this backup zk server
+ standaloneServerFactoryList.remove(backupZKServerIndex);
+ clientPortList.remove(backupZKServerIndex);
+ zooKeeperServers.remove(backupZKServerIndex);
+ LOG.info("Kill one backup ZK servers in the cluster " +
+ "on client port: " + clientPort);
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerDown(int port, long timeout) throws
+ InterruptedException {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = null;
+ try {
+ sock = new Socket("localhost", port);
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+ } finally {
+ IOUtils.closeSocket(sock);
+ }
+ } catch (IOException e) {
+ return true;
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ Thread.sleep(250);
+ }
+ return false;
+ }
+
+ // XXX: From o.a.zk.t.ClientBase
+ private static boolean waitForServerUp(int port, long timeout) throws
+ InterruptedException {
+ long start = System.currentTimeMillis();
+ while (true) {
+ try {
+ Socket sock = null;
+ sock = new Socket("localhost", port);
+ BufferedReader reader = null;
+ try {
+ OutputStream outstream = sock.getOutputStream();
+ outstream.write("stat".getBytes());
+ outstream.flush();
+
+ Reader isr = new InputStreamReader(sock.getInputStream());
+ reader = new BufferedReader(isr);
+ String line = reader.readLine();
+ if (line != null && line.startsWith("Zookeeper version:")) {
+ return true;
+ }
+ } finally {
+ IOUtils.closeSocket(sock);
+ IOUtils.closeStream(reader);
+ }
+ } catch (IOException e) {
+ // ignore as this is expected
+ LOG.debug("server localhost:" + port + " not up " + e);
+ }
+
+ if (System.currentTimeMillis() > start + timeout) {
+ break;
+ }
+ Thread.sleep(250);
+ }
+ return false;
+ }
+
+ public int getClientPort() {
+ return clientPort;
+ }
+
+ public String getZkQuorum() {
+ return zkQuorum;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
new file mode 100644
index 0000000..045b72c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.Watcher;
+
+/**
+ * Relays ZK watcher events to a closure
+ */
+public abstract class ZKCallback implements Watcher {
+
+ public ZKCallback() {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
new file mode 100644
index 0000000..ca41e4b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+public class ZKIntegration implements Watcher, Closeable {
+
+/**
+ * Base path for services
+ */
+ public static String ZK_SERVICES = "services";
+ /**
+ * Base path for all Slider references
+ */
+ public static String ZK_SLIDER = "slider";
+ public static String ZK_USERS = "users";
+ public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
+ public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
+
+ public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>();
+ static {
+ ZK_USERS_PATH_LIST.add(ZK_SERVICES);
+ ZK_USERS_PATH_LIST.add(ZK_SLIDER);
+ ZK_USERS_PATH_LIST.add(ZK_USERS);
+ }
+
+ public static int SESSION_TIMEOUT = 30000;
+ protected static final Logger log =
+ LoggerFactory.getLogger(ZKIntegration.class);
+ private ZooKeeper zookeeper;
+ private final String username;
+ private final String clustername;
+ private final String userPath;
+ private int sessionTimeout = SESSION_TIMEOUT;
+/**
+ flag to set to indicate that the user path should be created if
+ it is not already there
+ */
+ private final AtomicBoolean toInit = new AtomicBoolean(false);
+ private final boolean createClusterPath;
+ private final Watcher watchEventHandler;
+ private final String zkConnection;
+ private final boolean canBeReadOnly;
+
+ protected ZKIntegration(String zkConnection,
+ String username,
+ String clustername,
+ boolean canBeReadOnly,
+ boolean createClusterPath,
+ Watcher watchEventHandler,
+ int sessionTimeout
+ ) throws IOException {
+ this.username = username;
+ this.clustername = clustername;
+ this.watchEventHandler = watchEventHandler;
+ this.zkConnection = zkConnection;
+ this.canBeReadOnly = canBeReadOnly;
+ this.createClusterPath = createClusterPath;
+ this.sessionTimeout = sessionTimeout;
+ this.userPath = mkSliderUserPath(username);
+ }
+
+ public void init() throws IOException {
+ assert zookeeper == null;
+ log.debug("Binding ZK client to {}", zkConnection);
+ zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
+ }
+
+ /**
+ * Create an instance bonded to the specific closure
+ * @param zkConnection
+ * @param username
+ * @param clustername
+ * @param canBeReadOnly
+ * @param watchEventHandler
+ * @return the new instance
+ * @throws IOException
+ */
+ public static ZKIntegration newInstance(String zkConnection,
+ String username,
+ String clustername,
+ boolean createClusterPath,
+ boolean canBeReadOnly,
+ Watcher watchEventHandler,
+ int sessionTimeout) throws IOException {
+
+ return new ZKIntegration(zkConnection,
+ username,
+ clustername,
+ canBeReadOnly,
+ createClusterPath,
+ watchEventHandler,
+ sessionTimeout);
+ }
+
+
+ @Override
+ public synchronized void close() throws IOException {
+ if (zookeeper != null) {
+ try {
+ zookeeper.close();
+ } catch (InterruptedException ignored) {
+
+ }
+ zookeeper = null;
+ }
+ }
+
+ public String getConnectionString() {
+ return zkConnection;
+ }
+
+ public String getClusterPath() {
+ return mkClusterPath(username, clustername);
+ }
+
+ public boolean getConnected() {
+ return zookeeper.getState().isConnected();
+ }
+
+ public boolean getAlive() {
+ return zookeeper.getState().isAlive();
+ }
+
+ public ZooKeeper.States getState() {
+ return zookeeper.getState();
+ }
+
+ public Stat getClusterStat() throws KeeperException, InterruptedException {
+ return stat(getClusterPath());
+ }
+
+ public boolean exists(String path) throws
+ KeeperException,
+ InterruptedException {
+ return stat(path) != null;
+ }
+
+ public Stat stat(String path) throws KeeperException, InterruptedException {
+ return zookeeper.exists(path, false);
+ }
+
+ @Override
+ public String toString() {
+ return "ZK integration bound @ " + zkConnection + ": " + zookeeper;
+ }
+
+/**
+ * Event handler to notify of state events
+ * @param event
+ */
+ @Override
+ public void process(WatchedEvent event) {
+ log.debug("{}", event);
+ try {
+ maybeInit();
+ } catch (Exception e) {
+ log.error("Failed to init", e);
+ }
+ if (watchEventHandler != null) {
+ watchEventHandler.process(event);
+ }
+ }
+
+ private void maybeInit() throws KeeperException, InterruptedException {
+ if (!toInit.getAndSet(true) && createClusterPath) {
+ log.debug("initing");
+ //create the user path
+ mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ //create the specific user
+ createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+
+ /**
+ * Create a path under a parent, don't care if it already exists
+ * As the path isn't returned, this isn't the way to create sequentially
+ * numbered nodes.
+ * @param parent parent dir. Must have a trailing / if entry!=null||empty
+ * @param entry entry -can be null or "", in which case it is not appended
+ * @param acl
+ * @param createMode
+ * @return the path if created; null if not
+ */
+ public String createPath(String parent,
+ String entry,
+ List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ //initial create of full path
+ assert acl != null;
+ assert !acl.isEmpty();
+ assert parent != null;
+ String path = parent;
+ if (entry != null) {
+ path = path + entry;
+ }
+ try {
+ log.debug("Creating ZK path {}", path);
+ return zookeeper.create(path, null, acl, createMode);
+ } catch (KeeperException.NodeExistsException ignored) {
+ //node already there
+ log.debug("node already present:{}",path);
+ return null;
+ }
+ }
+
+ /**
+ * Recursive path create
+ * @param paths path list
+ * @param acl acl list
+ * @param createMode create modes
+ */
+ public void mkPath(List<String> paths,
+ List<ACL> acl,
+ CreateMode createMode) throws KeeperException, InterruptedException {
+ String history = "/";
+ for (String entry : paths) {
+ createPath(history, entry, acl, createMode);
+ history = history + entry + "/";
+ }
+ }
+
+/**
+ * Blocking enum of users
+ * @return an unordered list of clusters under a user
+ */
+ public List<String> getClusters() throws KeeperException, InterruptedException {
+ return zookeeper.getChildren(userPath, null);
+ }
+
+ /**
+ * Delete a node, does not throw an exception if the path is not fond
+ * @param path path to delete
+ * @return true if the path could be deleted, false if there was no node to delete
+ *
+ */
+ public boolean delete(String path) throws
+ InterruptedException,
+ KeeperException {
+ try {
+ zookeeper.delete(path, -1);
+ log.debug("Deleting {}", path);
+ return true;
+ } catch (KeeperException.NoNodeException ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Recursively delete a node, does not throw exception if any node does not exist.
+ * @param path
+ * @return true if delete was successful
+ */
+ public boolean deleteRecursive(String path) throws KeeperException, InterruptedException {
+
+ try {
+ List<String> children = zookeeper.getChildren(path, false);
+ for (String child : children) {
+ deleteRecursive(path + "/" + child);
+ }
+ delete(path);
+ } catch (KeeperException.NoNodeException ignored) {
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Build the path to a cluster; exists once the cluster has come up.
+ * Even before that, a ZK watcher could wait for it.
+ * @param username user
+ * @param clustername name of the cluster
+ * @return a strin
+ */
+ public static String mkClusterPath(String username, String clustername) {
+ return mkSliderUserPath(username) + "/" + clustername;
+ }
+/**
+ * Build the path to a cluster; exists once the cluster has come up.
+ * Even before that, a ZK watcher could wait for it.
+ * @param username user
+ * @return a string
+ */
+ public static String mkSliderUserPath(String username) {
+ return SVC_SLIDER_USERS + "/" + username;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
new file mode 100644
index 0000000..b088568
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import java.util.Locale;
+
+public final class ZKPathBuilder {
+
+ private final String username, appname, clustername;
+ private final String quorum;
+
+ private String appPath;
+ private String registryPath;
+ private final String appQuorum;
+
+ public ZKPathBuilder(String username,
+ String appname,
+ String clustername,
+ String quorum,
+ String appQuorum) {
+ this.username = username;
+ this.appname = appname;
+ this.clustername = clustername;
+ this.quorum = quorum;
+ appPath = buildAppPath();
+ registryPath = buildRegistryPath();
+ this.appQuorum = appQuorum;
+ }
+
+ public String buildAppPath() {
+ return String.format(Locale.ENGLISH, "/yarnapps_%s_%s_%s", appname,
+ username, clustername);
+
+ }
+
+ public String buildRegistryPath() {
+ return String.format(Locale.ENGLISH, "/services_%s_%s_%s", appname,
+ username, clustername);
+
+ }
+
+ public String getQuorum() {
+ return quorum;
+ }
+
+ public String getAppQuorum() {
+ return appQuorum;
+ }
+
+ public String getAppPath() {
+ return appPath;
+ }
+
+ public void setAppPath(String appPath) {
+ this.appPath = appPath;
+ }
+
+ public String getRegistryPath() {
+ return registryPath;
+ }
+
+ public void setRegistryPath(String registryPath) {
+ this.registryPath = registryPath;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
new file mode 100644
index 0000000..cc1b2c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.core.zk;
+
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.exceptions.BadConfigException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ZookeeperUtils {
+ public static final int DEFAULT_PORT = 2181;
+
+ public static String buildConnectionString(String zkHosts, int port) {
+ String zkPort = Integer.toString(port);
+ //parse the hosts
+ String[] hostlist = zkHosts.split(",", 0);
+ String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",", false);
+ return quorum;
+ }
+
+ /**
+ * Take a quorum list and split it to (trimmed) pairs
+ * @param hostPortQuorumList list of form h1:port, h2:port2,...
+ * @return a possibly empty list of values between commas. They may not be
+ * valid hostname:port pairs
+ */
+ public static List<String> splitToPairs(String hostPortQuorumList) {
+ // split an address hot
+ String[] strings = StringUtils.getStrings(hostPortQuorumList);
+ int len = 0;
+ if (strings != null) {
+ len = strings.length;
+ }
+ List<String> tuples = new ArrayList<String>(len);
+ if (strings != null) {
+ for (String s : strings) {
+ tuples.add(s.trim());
+ }
+ }
+ return tuples;
+ }
+
+ /**
+ * Split a quorum list into a list of hostnames and ports
+ * @param hostPortQuorumList split to a list of hosts and ports
+ * @return a list of values
+ */
+ public static List<HostAndPort> splitToHostsAndPorts(String hostPortQuorumList) {
+ // split an address hot
+ String[] strings = StringUtils.getStrings(hostPortQuorumList);
+ int len = 0;
+ if (strings != null) {
+ len = strings.length;
+ }
+ List<HostAndPort> list = new ArrayList<HostAndPort>(len);
+ if (strings != null) {
+ for (String s : strings) {
+ list.add(HostAndPort.fromString(s.trim()).withDefaultPort(DEFAULT_PORT));
+ }
+ }
+ return list;
+ }
+
+ /**
+ * Build up to a hosts only list
+ * @param hostAndPorts
+ * @return a list of the hosts only
+ */
+ public static String buildHostsOnlyList(List<HostAndPort> hostAndPorts) {
+ StringBuilder sb = new StringBuilder();
+ for (HostAndPort hostAndPort : hostAndPorts) {
+ sb.append(hostAndPort.getHostText()).append(",");
+ }
+ if (sb.length() > 0) {
+ sb.delete(sb.length() - 1, sb.length());
+ }
+ return sb.toString();
+ }
+
+ public static String buildQuorumEntry(HostAndPort hostAndPort,
+ int defaultPort) {
+ String s = hostAndPort.toString();
+ if (hostAndPort.hasPort()) {
+ return s;
+ } else {
+ return s + ":" + defaultPort;
+ }
+ }
+
+ /**
+ * Build a quorum list, injecting a ":defaultPort" ref if needed on
+ * any entry without one
+ * @param hostAndPorts
+ * @param defaultPort
+ * @return
+ */
+ public static String buildQuorum(List<HostAndPort> hostAndPorts, int defaultPort) {
+ List<String> entries = new ArrayList<String>(hostAndPorts.size());
+ for (HostAndPort hostAndPort : hostAndPorts) {
+ entries.add(buildQuorumEntry(hostAndPort, defaultPort));
+ }
+ return SliderUtils.join(entries, ",", false);
+ }
+
+ public static String convertToHostsOnlyList(String quorum) throws
+ BadConfigException {
+ List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
+ return ZookeeperUtils.buildHostsOnlyList(hostAndPorts);
+ }
+
+ public static List<HostAndPort> splitToHostsAndPortsStrictly(String quorum) throws
+ BadConfigException {
+ List<HostAndPort> hostAndPorts =
+ ZookeeperUtils.splitToHostsAndPorts(quorum);
+ if (hostAndPorts.isEmpty()) {
+ throw new BadConfigException("empty zookeeper quorum");
+ }
+ return hostAndPorts;
+ }
+
+ public static int getFirstPort(String quorum, int defVal) throws
+ BadConfigException {
+ List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
+ int port = hostAndPorts.get(0).getPortOrDefault(defVal);
+ return port;
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
new file mode 100644
index 0000000..510de5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.conf.ConfTreeOperations;
+import org.apache.slider.core.conf.MapOperations;
+import org.apache.slider.core.exceptions.BadClusterStateException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.launch.AbstractLauncher;
+import org.codehaus.jettison.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.slider.api.ResourceKeys.COMPONENT_INSTANCES;
+import static org.apache.slider.api.ResourceKeys.DEF_YARN_CORES;
+import static org.apache.slider.api.ResourceKeys.DEF_YARN_MEMORY;
+import static org.apache.slider.api.ResourceKeys.YARN_CORES;
+import static org.apache.slider.api.ResourceKeys.YARN_MEMORY;
+
+public abstract class AbstractClientProvider extends Configured {
+ private static final Logger log =
+ LoggerFactory.getLogger(AbstractClientProvider.class);
+ protected static final ProviderUtils providerUtils =
+ new ProviderUtils(log);
+
+ public static final String PROVIDER_RESOURCE_BASE =
+ "org/apache/slider/providers/";
+ public static final String PROVIDER_RESOURCE_BASE_ROOT =
+ "/" + PROVIDER_RESOURCE_BASE;
+
+ public AbstractClientProvider(Configuration conf) {
+ super(conf);
+ }
+
+ public abstract String getName();
+
+ public abstract List<ProviderRole> getRoles();
+
+ /**
+ * Verify that an instance definition is considered valid by the provider
+ * @param instanceDefinition instance definition
+ * @throws SliderException if the configuration is not valid
+ */
+ public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws
+ SliderException {
+
+ List<ProviderRole> roles = getRoles();
+ ConfTreeOperations resources =
+ instanceDefinition.getResourceOperations();
+ for (ProviderRole role : roles) {
+ String name = role.name;
+ MapOperations component = resources.getComponent(role.group);
+ if (component != null) {
+ String instances = component.get(COMPONENT_INSTANCES);
+ if (instances == null) {
+ String message = "No instance count provided for " + name;
+ log.error("{} with \n{}", message, resources.toString());
+ throw new BadClusterStateException(message);
+ }
+ String ram = component.get(YARN_MEMORY);
+ String cores = component.get(YARN_CORES);
+
+
+ providerUtils.getRoleResourceRequirement(ram,
+ DEF_YARN_MEMORY,
+ Integer.MAX_VALUE);
+ providerUtils.getRoleResourceRequirement(cores,
+ DEF_YARN_CORES,
+ Integer.MAX_VALUE);
+ }
+ }
+ }
+
+
+ /**
+ * Any provider-side alteration of a configuration can take place here.
+ * @param aggregateConf config to patch
+ * @throws IOException IO problems
+ * @throws SliderException Slider-specific issues
+ */
+ public void prepareInstanceConfiguration(AggregateConf aggregateConf) throws
+ SliderException,
+ IOException {
+ //default: do nothing
+ }
+
+
+ /**
+ * Prepare the AM settings for launch
+ * @param fileSystem filesystem
+ * @param serviceConf configuration of the client
+ * @param launcher launcher to set up
+ * @param instanceDescription instance description being launched
+ * @param snapshotConfDirPath
+ * @param generatedConfDirPath
+ * @param clientConfExtras
+ * @param libdir
+ * @param tempPath
+ * @param miniClusterTestRun flag set to true on a mini cluster run
+ * @throws IOException
+ * @throws SliderException
+ */
+ public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem,
+ Configuration serviceConf,
+ AbstractLauncher launcher,
+ AggregateConf instanceDescription,
+ Path snapshotConfDirPath,
+ Path generatedConfDirPath,
+ Configuration clientConfExtras,
+ String libdir,
+ Path tempPath,
+ boolean miniClusterTestRun)
+ throws IOException, SliderException {
+
+ }
+
+ /**
+ * Load in and merge in templates. Null arguments means "no such template"
+ * @param instanceConf instance to patch
+ * @param internalTemplate patch to internal.json
+ * @param resourceTemplate path to resources.json
+ * @param appConfTemplate path to app_conf.json
+ * @throws IOException any IO problems
+ */
+ protected void mergeTemplates(AggregateConf instanceConf,
+ String internalTemplate,
+ String resourceTemplate,
+ String appConfTemplate) throws IOException {
+ if (internalTemplate != null) {
+ ConfTreeOperations template =
+ ConfTreeOperations.fromResource(internalTemplate);
+ instanceConf.getInternalOperations()
+ .mergeWithoutOverwrite(template.confTree);
+ }
+
+ if (resourceTemplate != null) {
+ ConfTreeOperations resTemplate =
+ ConfTreeOperations.fromResource(resourceTemplate);
+ instanceConf.getResourceOperations()
+ .mergeWithoutOverwrite(resTemplate.confTree);
+ }
+
+ if (appConfTemplate != null) {
+ ConfTreeOperations template =
+ ConfTreeOperations.fromResource(appConfTemplate);
+ instanceConf.getAppConfOperations()
+ .mergeWithoutOverwrite(template.confTree);
+ }
+
+ }
+
+ /**
+ * This is called pre-launch to validate that the cluster specification
+ * is valid. This can include checking that the security options
+ * are in the site files prior to launch, that there are no conflicting operations
+ * etc.
+ *
+ * This check is made prior to every launch of the cluster -so can
+ * pick up problems which manually edited cluster files have added,
+ * or from specification files from previous versions.
+ *
+ * The provider MUST NOT change the remote specification. This is
+ * purely a pre-launch validation of options.
+ *
+ *
+ * @param sliderFileSystem filesystem
+ * @param clustername name of the cluster
+ * @param configuration cluster configuration
+ * @param instanceDefinition cluster specification
+ * @param clusterDirPath directory of the cluster
+ * @param generatedConfDirPath path to place generated artifacts
+ * @param secure flag to indicate that the cluster is secure
+ * @throws SliderException on any validation issue
+ * @throws IOException on any IO problem
+ */
+ public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem,
+ String clustername,
+ Configuration configuration,
+ AggregateConf instanceDefinition,
+ Path clusterDirPath,
+ Path generatedConfDirPath,
+ boolean secure)
+ throws SliderException, IOException {
+ validateInstanceDefinition(instanceDefinition, sliderFileSystem);
+ }
+
+ /**
+ * Return a set of application specific string tags.
+ * @return the set of tags.
+ */
+ public Set<String> getApplicationTags (SliderFileSystem fileSystem,
+ String appDef) throws SliderException {
+ return Collections.emptySet();
+ }
+
+ /**
+ * Process client operations for applications such as install, configure
+ * @param fileSystem
+ * @param registryOperations
+ * @param configuration
+ * @param operation
+ * @param clientInstallPath
+ * @param clientPackage
+ * @param clientConfig
+ * @param name
+ * @throws SliderException
+ */
+ public void processClientOperation(SliderFileSystem fileSystem,
+ RegistryOperations registryOperations,
+ Configuration configuration,
+ String operation,
+ File clientInstallPath,
+ File clientPackage,
+ JSONObject clientConfig,
+ String name)
+ throws SliderException {
+ throw new SliderException("Provider does not support client operations.");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
new file mode 100644
index 0000000..61b2655
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -0,0 +1,424 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.slider.api.ClusterDescription;
+import org.apache.slider.common.SliderKeys;
+import org.apache.slider.common.tools.ConfigHelper;
+import org.apache.slider.common.tools.SliderFileSystem;
+import org.apache.slider.common.tools.SliderUtils;
+import org.apache.slider.core.conf.AggregateConf;
+import org.apache.slider.core.exceptions.BadCommandArgumentsException;
+import org.apache.slider.core.exceptions.SliderException;
+import org.apache.slider.core.main.ExitCodeProvider;
+import org.apache.slider.server.appmaster.actions.QueueAccess;
+import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
+import org.apache.slider.server.appmaster.state.ContainerReleaseSelector;
+import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
+import org.apache.slider.server.appmaster.state.StateAccessForProviders;
+import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
+import org.apache.slider.server.services.workflow.ForkedProcessService;
+import org.apache.slider.server.services.workflow.ServiceParent;
+import org.apache.slider.server.services.workflow.WorkflowSequenceService;
+import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * The base class for provider services. It lets the implementations
+ * add sequences of operations, and propagates service failures
+ * upstream
+ */
+public abstract class AbstractProviderService
+ extends WorkflowSequenceService
+ implements
+ ProviderCore,
+ SliderKeys,
+ ProviderService {
+ private static final Logger log =
+ LoggerFactory.getLogger(AbstractProviderService.class);
+ protected StateAccessForProviders amState;
+ protected AgentRestOperations restOps;
+ protected URL amWebAPI;
+ protected YarnRegistryViewForProviders yarnRegistry;
+ protected QueueAccess queueAccess;
+
+ protected AbstractProviderService(String name) {
+ super(name);
+ setStopIfNoChildServicesAtStartup(false);
+ }
+
+ @Override
+ public Configuration getConf() {
+ return getConfig();
+ }
+
+ public StateAccessForProviders getAmState() {
+ return amState;
+ }
+
+ public QueueAccess getQueueAccess() {
+ return queueAccess;
+ }
+
+ public void setAmState(StateAccessForProviders amState) {
+ this.amState = amState;
+ }
+
+ @Override
+ public String getHumanName() {
+ return getName().toLowerCase(Locale.ENGLISH);
+ }
+
+ @Override
+ public void bind(StateAccessForProviders stateAccessor,
+ QueueAccess queueAccess,
+ List<Container> liveContainers) {
+ this.amState = stateAccessor;
+ this.queueAccess = queueAccess;
+ }
+
+ @Override
+ public void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry) {
+ this.yarnRegistry = yarnRegistry;
+ }
+
+ public YarnRegistryViewForProviders getYarnRegistry() {
+ return yarnRegistry;
+ }
+
+ @Override
+ public AgentRestOperations getAgentRestOperations() {
+ return restOps;
+ }
+
+ @Override
+ public void notifyContainerCompleted(ContainerId containerId) {
+ }
+
+ public void setAgentRestOperations(AgentRestOperations agentRestOperations) {
+ this.restOps = agentRestOperations;
+ }
+
+ /**
+ * Load a specific XML configuration file for the provider config
+ * @param confDir configuration directory
+ * @param siteXMLFilename provider-specific filename
+ * @return a configuration to be included in status
+ * @throws BadCommandArgumentsException argument problems
+ * @throws IOException IO problems
+ */
+ protected Configuration loadProviderConfigurationInformation(File confDir,
+ String siteXMLFilename)
+ throws BadCommandArgumentsException, IOException {
+ Configuration siteConf;
+ File siteXML = new File(confDir, siteXMLFilename);
+ if (!siteXML.exists()) {
+ throw new BadCommandArgumentsException(
+ "Configuration directory %s doesn't contain %s - listing is %s",
+ confDir, siteXMLFilename, SliderUtils.listDir(confDir));
+ }
+
+ //now read it in
+ siteConf = ConfigHelper.loadConfFromFile(siteXML);
+ log.info("{} file is at {}", siteXMLFilename, siteXML);
+ log.info(ConfigHelper.dumpConfigToString(siteConf));
+ return siteConf;
+ }
+
+ /**
+ * No-op implementation of this method.
+ */
+ @Override
+ public void initializeApplicationConfiguration(
+ AggregateConf instanceDefinition, SliderFileSystem fileSystem)
+ throws IOException, SliderException {
+ }
+
+ /**
+ * No-op implementation of this method.
+ *
+ * {@inheritDoc}
+ */
+ @Override
+ public void validateApplicationConfiguration(AggregateConf instance,
+ File confDir,
+ boolean secure)
+ throws IOException, SliderException {
+
+ }
+
+ /**
+ * Scan through the roles and see if it is supported.
+ * @param role role to look for
+ * @return true if the role is known about -and therefore
+ * that a launcher thread can be deployed to launch it
+ */
+ @Override
+ public boolean isSupportedRole(String role) {
+ Collection<ProviderRole> roles = getRoles();
+ for (ProviderRole providedRole : roles) {
+ if (providedRole.name.equals(role)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * override point to allow a process to start executing in this container
+ * @param instanceDefinition cluster description
+ * @param confDir configuration directory
+ * @param env environment
+ * @param execInProgress the callback for the exec events
+ * @return false
+ * @throws IOException
+ * @throws SliderException
+ */
+ @Override
+ public boolean exec(AggregateConf instanceDefinition,
+ File confDir,
+ Map<String, String> env,
+ ProviderCompleted execInProgress) throws IOException, SliderException {
+ return false;
+ }
+
+ @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+ @Override // ExitCodeProvider
+ public int getExitCode() {
+ Throwable cause = getFailureCause();
+ if (cause != null) {
+ //failed for some reason
+ if (cause instanceof ExitCodeProvider) {
+ return ((ExitCodeProvider) cause).getExitCode();
+ }
+ }
+ ForkedProcessService lastProc = latestProcess();
+ if (lastProc == null || !lastProc.isProcessTerminated()) {
+ return 0;
+ } else {
+ return lastProc.getExitCode();
+ }
+ }
+
+ /**
+ * Return the latest forked process service that ran
+ * @return the forkes service
+ */
+ protected ForkedProcessService latestProcess() {
+ Service current = getActiveService();
+ Service prev = getPreviousService();
+
+ Service latest = current != null ? current : prev;
+ if (latest instanceof ForkedProcessService) {
+ return (ForkedProcessService) latest;
+ } else {
+ //its a composite object, so look inside it for a process
+ if (latest instanceof ServiceParent) {
+ return getFPSFromParentService((ServiceParent) latest);
+ } else {
+ //no match
+ return null;
+ }
+ }
+ }
+
+
+ /**
+ * Given a parent service, find the one that is a forked process
+ * @param serviceParent parent
+ * @return the forked process service or null if there is none
+ */
+ protected ForkedProcessService getFPSFromParentService(ServiceParent serviceParent) {
+ List<Service> services = serviceParent.getServices();
+ for (Service s : services) {
+ if (s instanceof ForkedProcessService) {
+ return (ForkedProcessService) s;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * if we are already running, start this service
+ */
+ protected void maybeStartCommandSequence() {
+ if (isInState(STATE.STARTED)) {
+ startNextService();
+ }
+ }
+
+ /**
+ * Create a new forked process service with the given
+ * name, environment and command list -then add it as a child
+ * for execution in the sequence.
+ *
+ * @param name command name
+ * @param env environment
+ * @param commands command line
+ * @throws IOException
+ * @throws SliderException
+ */
+ protected ForkedProcessService queueCommand(String name,
+ Map<String, String> env,
+ List<String> commands) throws
+ IOException,
+ SliderException {
+ ForkedProcessService process = buildProcess(name, env, commands);
+ //register the service for lifecycle management; when this service
+ //is terminated, so is the master process
+ addService(process);
+ return process;
+ }
+
+ public ForkedProcessService buildProcess(String name,
+ Map<String, String> env,
+ List<String> commands) throws
+ IOException,
+ SliderException {
+ ForkedProcessService process;
+ process = new ForkedProcessService(name);
+ process.init(getConfig());
+ process.build(env, commands);
+ return process;
+ }
+
+ /*
+ * Build the provider status, can be empty
+ * @return the provider status - map of entries to add to the info section
+ */
+ @Override
+ public Map<String, String> buildProviderStatus() {
+ return new HashMap<String, String>();
+ }
+
+ /*
+ Build the monitor details. The base implementation includes all the external URL endpoints
+ in the external view
+ */
+ @Override
+ public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
+ Map<String, MonitorDetail> details = new LinkedHashMap<String, MonitorDetail>();
+
+ // add in all the endpoints
+ buildEndpointDetails(details);
+
+ return details;
+ }
+
+ @Override
+ public void buildEndpointDetails(Map<String, MonitorDetail> details) {
+ ServiceRecord self = yarnRegistry.getSelfRegistration();
+
+ List<Endpoint> externals = self.external;
+ for (Endpoint endpoint : externals) {
+ String addressType = endpoint.addressType;
+ if (AddressTypes.ADDRESS_URI.equals(addressType)) {
+ try {
+ List<URL> urls = RegistryTypeUtils.retrieveAddressURLs(endpoint);
+ if (!urls.isEmpty()) {
+ details.put(endpoint.api, new MonitorDetail(urls.get(0).toString(), true));
+ }
+ } catch (InvalidRecordException | MalformedURLException ignored) {
+ // Ignored
+ }
+
+ }
+
+ }
+ }
+
+ @Override
+ public void applyInitialRegistryDefinitions(URL amWebURI,
+ URL agentOpsURI,
+ URL agentStatusURI,
+ ServiceRecord serviceRecord)
+ throws IOException {
+ this.amWebAPI = amWebURI;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ *
+ * @return The base implementation returns the most recent containers first.
+ */
+ @Override
+ public ContainerReleaseSelector createContainerReleaseSelector() {
+ return new MostRecentContainerReleaseSelector();
+ }
+
+ @Override
+ public void releaseAssignedContainer(ContainerId containerId) {
+ // no-op
+ }
+
+ @Override
+ public void addContainerRequest(AMRMClient.ContainerRequest req) {
+ // no-op
+ }
+
+ @Override
+ public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
+ // no-op
+ }
+
+ @Override
+ public int cancelContainerRequests(Priority priority1,
+ Priority priority2,
+ int count) {
+ return 0;
+ }
+
+ @Override
+ public void execute(List<AbstractRMOperation> operations) {
+ for (AbstractRMOperation operation : operations) {
+ operation.execute(this);
+ }
+ }
+ /**
+ * No-op implementation of this method.
+ */
+ @Override
+ public void rebuildContainerDetails(List<Container> liveContainers,
+ String applicationId, Map<Integer, ProviderRole> providerRoles) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
new file mode 100644
index 0000000..27d3415
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.slider.providers;
+
+/**
+ * Details about some exported information from a provider to the AM web UI.
+ */
+public class MonitorDetail {
+
+ private final String value;
+ private final boolean isUrl;
+
+ public MonitorDetail(String value, boolean isUrl) {
+ this.value = value;
+ this.isUrl = isUrl;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public boolean isUrl() {
+ return isUrl;
+ }
+
+ public String toString() {
+ return "MonitorDetail[" + value + " isUrl=" + isUrl + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
new file mode 100644
index 0000000..128dd5d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+/**
+ * Placement values.
+ * This is nominally a bitmask, though not all values make sense
+ */
+public class PlacementPolicy {
+
+ /**
+ * Default value: history used, anti-affinity hinted at on rebuild/flex up
+ */
+ public static final int NONE = 0;
+
+ /**
+ * Default value: history used, anti-affinity hinted at on rebuild/flex up
+ */
+ public static final int DEFAULT = NONE;
+
+ /**
+ * Strict placement: when asking for an instance for which there is
+ * history, mandate that it is strict
+ */
+ public static final int STRICT = 1;
+
+ /**
+ * No data locality; do not use placement history
+ */
+ public static final int ANYWHERE = 2;
+
+ /**
+ * @Deprecated: use {@link #ANYWHERE}
+ */
+ @Deprecated
+ public static final int NO_DATA_LOCALITY = ANYWHERE;
+
+ /**
+ * Anti-affinity is mandatory.
+ */
+ public static final int ANTI_AFFINITY_REQUIRED = 4;
+
+ /**
+ * Exclude from flexing; used internally to mark AMs.
+ */
+ public static final int EXCLUDE_FROM_FLEXING = 16;
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java
new file mode 100644
index 0000000..e61f944
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/PlacementPolicyOptions.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+public enum PlacementPolicyOptions {
+
+ EXCLUDE_FROM_FLEXING,
+ NO_DATA_LOCALITY,
+ ANTI_AFFINITY_REQUIRED,
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
new file mode 100644
index 0000000..f6ff4fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+/**
+ * This is the callback triggered by the {@link ProviderCompletedCallable}
+ * when it generates a notification
+ */
+public interface ProviderCompleted {
+
+ public void eventCallbackEvent(Object parameter);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/31c4a419/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
new file mode 100644
index 0000000..47939c9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import java.util.concurrent.Callable;
+
+public class ProviderCompletedCallable implements Callable<Object> {
+
+ private final ProviderCompleted callback;
+ private final Object parameter;
+
+ public ProviderCompletedCallable(ProviderCompleted callback, Object parameter) {
+ this.callback = callback;
+ this.parameter = parameter;
+ }
+
+ @Override
+ public Object call() throws Exception {
+ callback.eventCallbackEvent(parameter);
+ return parameter;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org