You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by go...@apache.org on 2016/08/25 20:19:58 UTC
[29/46] incubator-slider git commit: SLIDER-1165 Create
yarn-native-services branch on Slider corresponding to the
yarn-native-services branch on Hadoop
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/restclient/HttpVerb.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/restclient/HttpVerb.java b/slider-core/src/main/java/org/apache/slider/core/restclient/HttpVerb.java
deleted file mode 100644
index c040345..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/restclient/HttpVerb.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.restclient;
-
-/**
- * Http verbs with details on what they support in terms of submit and
- * response bodies.
- * <p>
- * Those verbs which do support bodies in the response MAY NOT return it;
- * if the response code is 204 then the answer is "no body", but the operation
- * is considered a success.
- */
-public enum HttpVerb {
- GET("GET", false, true),
- POST("POST", true, true),
- PUT("PUT", true, true),
- DELETE("DELETE", false, true),
- HEAD("HEAD", false, false);
-
- private final String verb;
- private final boolean hasUploadBody;
- private final boolean hasResponseBody;
-
- HttpVerb(String verb, boolean hasUploadBody, boolean hasResponseBody) {
- this.verb = verb;
- this.hasUploadBody = hasUploadBody;
- this.hasResponseBody = hasResponseBody;
- }
-
- public String getVerb() {
- return verb;
- }
-
- public boolean hasUploadBody() {
- return hasUploadBody;
- }
-
- public boolean hasResponseBody() {
- return hasResponseBody;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/restclient/SliderURLConnectionFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/restclient/SliderURLConnectionFactory.java b/slider-core/src/main/java/org/apache/slider/core/restclient/SliderURLConnectionFactory.java
deleted file mode 100644
index e453f52..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/restclient/SliderURLConnectionFactory.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.restclient;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.web.KerberosUgiAuthenticator;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-
-/**
- * Factory for URL connections; used behind the scenes in the Jersey integration.
- * <p>
- * Derived from the WebHDFS implementation.
- */
-public class SliderURLConnectionFactory {
- private static final Logger log =
- LoggerFactory.getLogger(SliderURLConnectionFactory.class);
-
- /**
- * Timeout for socket connects and reads
- */
- public final static int DEFAULT_SOCKET_TIMEOUT = 60 * 1000; // 1 minute
- private final ConnectionConfigurator connConfigurator;
-
- private static final ConnectionConfigurator DEFAULT_CONFIGURATOR = new BasicConfigurator();
-
- /**
- * Construct a new URLConnectionFactory based on the configuration. It will
- * try to load SSL certificates when it is specified.
- */
- public static SliderURLConnectionFactory newInstance(Configuration conf) {
- ConnectionConfigurator conn;
- try {
- conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
- } catch (Exception e) {
- log.debug("Cannot load customized SSL configuration.", e);
- conn = DEFAULT_CONFIGURATOR;
- }
- return new SliderURLConnectionFactory(conn);
- }
-
- private SliderURLConnectionFactory(ConnectionConfigurator connConfigurator) {
- this.connConfigurator = connConfigurator;
- }
-
- /**
- * Create a new ConnectionConfigurator for SSL connections
- */
- private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
- Configuration conf) throws IOException, GeneralSecurityException {
- final SSLFactory factory;
- final SSLSocketFactory sf;
- final HostnameVerifier hv;
-
- factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- factory.init();
- sf = factory.createSSLSocketFactory();
- hv = factory.getHostnameVerifier();
-
- return new ConnectionConfigurator() {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- if (conn instanceof HttpsURLConnection) {
- HttpsURLConnection c = (HttpsURLConnection) conn;
- c.setSSLSocketFactory(sf);
- c.setHostnameVerifier(hv);
- }
- SliderURLConnectionFactory.setupConnection(conn, timeout);
- return conn;
- }
- };
- }
-
- /**
- * Opens a url with read and connect timeouts
- *
- * @param url
- * to open
- * @return URLConnection
- * @throws IOException
- */
- public URLConnection openConnection(URL url) throws IOException {
- try {
- return openConnection(url, false);
- } catch (AuthenticationException e) {
- // Unreachable
- return null;
- }
- }
-
- /**
- * Opens a url with read and connect timeouts
- *
- * @param url
- * URL to open
- * @param isSpnego
- * whether the url should be authenticated via SPNEGO
- * @return URLConnection
- * @throws IOException
- * @throws AuthenticationException
- */
- public URLConnection openConnection(URL url, boolean isSpnego)
- throws IOException, AuthenticationException {
- if (isSpnego) {
- log.debug("open AuthenticatedURL connection {}", url);
- UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
- final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
- return new AuthenticatedURL(new KerberosUgiAuthenticator(),
- connConfigurator).openConnection(url, authToken);
- } else {
- log.debug("open URL connection {}", url);
- URLConnection connection = url.openConnection();
- if (connection instanceof HttpURLConnection) {
- connConfigurator.configure((HttpURLConnection) connection);
- }
- return connection;
- }
- }
-
- /**
- * Sets connection parameters on the given URLConnection
- *
- * @param connection
- * URLConnection to set
- * @param socketTimeout
- * the connection and read timeout of the connection.
- */
- private static void setupConnection(URLConnection connection, int socketTimeout) {
- connection.setConnectTimeout(socketTimeout);
- connection.setReadTimeout(socketTimeout);
- connection.setUseCaches(false);
- if (connection instanceof HttpURLConnection) {
- ((HttpURLConnection) connection).setInstanceFollowRedirects(true);
- }
- }
-
- private static class BasicConfigurator implements ConnectionConfigurator {
- @Override
- public HttpURLConnection configure(HttpURLConnection conn)
- throws IOException {
- SliderURLConnectionFactory.setupConnection(conn, DEFAULT_SOCKET_TIMEOUT);
- return conn;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java b/slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
deleted file mode 100644
index bf71861..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/restclient/UgiJerseyBinding.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.restclient;
-
-import com.google.common.base.Preconditions;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.UniformInterfaceException;
-import com.sun.jersey.api.client.config.ClientConfig;
-import com.sun.jersey.api.client.config.DefaultClientConfig;
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
-import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.slider.core.exceptions.ExceptionConverter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-
-/**
- * Class to bond to a Jersey client, for UGI integration and SPNEGO.
- * <p>
- * Usage: create an instance, then when creating a Jersey <code>Client</code>
- * pass in to the constructor the handler provided by {@link #getHandler()}
- *
- * see <a href="https://jersey.java.net/apidocs/1.17/jersey/com/sun/jersey/client/urlconnection/HttpURLConnectionFactory.html">Jersey docs</a>
- */
-public class UgiJerseyBinding implements
- HttpURLConnectionFactory {
- private static final Logger log =
- LoggerFactory.getLogger(UgiJerseyBinding.class);
-
- private final UrlConnectionOperations operations;
- private final URLConnectionClientHandler handler;
-
- /**
- * Construct an instance
- * @param operations operations instance
- */
- @SuppressWarnings("ThisEscapedInObjectConstruction")
- public UgiJerseyBinding(UrlConnectionOperations operations) {
- Preconditions.checkArgument(operations != null, "Null operations");
- this.operations = operations;
- handler = new URLConnectionClientHandler(this);
- }
-
- /**
- * Create an instance off the configuration. The SPNEGO policy
- * is derived from the current UGI settings.
- * @param conf config
- */
- public UgiJerseyBinding(Configuration conf) {
- this(new UrlConnectionOperations(conf));
- }
-
- /**
- * Get a URL connection.
- * @param url URL to connect to
- * @return the connection
- * @throws IOException any problem. {@link AuthenticationException}
- * errors are wrapped
- */
- @Override
- public HttpURLConnection getHttpURLConnection(URL url) throws IOException {
- try {
- // open a connection handling status codes and so redirections
- // but as it opens a connection, it's less useful than you think.
-
- return operations.openConnection(url);
- } catch (AuthenticationException e) {
- throw new IOException(e);
- }
- }
-
- public UrlConnectionOperations getOperations() {
- return operations;
- }
-
- public URLConnectionClientHandler getHandler() {
- return handler;
- }
-
- /**
- * Get the SPNEGO flag (as found in the operations instance
- * @return the spnego policy
- */
- public boolean isUseSpnego() {
- return operations.isUseSpnego();
- }
-
-
- /**
- * Uprate error codes 400 and up into faults;
- * <p>
- * see {@link ExceptionConverter#convertJerseyException(String, String, UniformInterfaceException)}
- */
- public static IOException uprateFaults(HttpVerb verb, String url,
- UniformInterfaceException ex)
- throws IOException {
- return ExceptionConverter.convertJerseyException(verb.getVerb(),
- url, ex);
- }
-
- /**
- * Create the standard Jersey client Config
- * @return the recommended Jersey Client config
- */
- public ClientConfig createJerseyClientConfig() {
- ClientConfig clientConfig = new DefaultClientConfig();
- clientConfig.getFeatures().put(JSONConfiguration.FEATURE_POJO_MAPPING, true);
- return clientConfig;
- }
-
- /**
- * Create a jersey client bonded to this handler, using the
- * supplied client config
- * @param clientConfig client configuratin
- * @return a new client instance to use
- */
- public Client createJerseyClient(ClientConfig clientConfig) {
- return new Client(getHandler(), clientConfig);
- }
-
- /**
- * Create a jersey client bonded to this handler, using the
- * client config created with {@link #createJerseyClientConfig()}
- * @return a new client instance to use
- */
- public Client createJerseyClient() {
- return createJerseyClient(createJerseyClientConfig());
- }
-
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java b/slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
deleted file mode 100644
index 20ef198..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/restclient/UrlConnectionOperations.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.restclient;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.yarn.webapp.ForbiddenException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.net.ssl.SSLException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-
-/**
- * Operations on the JDK UrlConnection class.
- *
- */
-public class UrlConnectionOperations extends Configured {
- private static final Logger log =
- LoggerFactory.getLogger(UrlConnectionOperations.class);
-
- private SliderURLConnectionFactory connectionFactory;
-
- private boolean useSpnego = false;
-
- /**
- * Create an instance off the configuration. The SPNEGO policy
- * is derived from the current UGI settings.
- * @param conf config
- */
- public UrlConnectionOperations(Configuration conf) {
- super(conf);
- connectionFactory = SliderURLConnectionFactory.newInstance(conf);
- if (UserGroupInformation.isSecurityEnabled()) {
- log.debug("SPNEGO is enabled");
- setUseSpnego(true);
- }
- }
-
-
- public boolean isUseSpnego() {
- return useSpnego;
- }
-
- public void setUseSpnego(boolean useSpnego) {
- this.useSpnego = useSpnego;
- }
-
- /**
- * Opens a url with cache disabled, redirect handled in
- * (JDK) implementation.
- *
- * @param url to open
- * @return URLConnection
- * @throws IOException
- * @throws AuthenticationException authentication failure
- */
- public HttpURLConnection openConnection(URL url) throws
- IOException,
- AuthenticationException {
- Preconditions.checkArgument(url.getPort() != 0, "no port");
- return (HttpURLConnection) connectionFactory.openConnection(url, useSpnego);
- }
-
- public HttpOperationResponse execGet(URL url) throws
- IOException,
- AuthenticationException {
- return execHttpOperation(HttpVerb.GET, url, null, "");
- }
-
- public HttpOperationResponse execHttpOperation(HttpVerb verb,
- URL url,
- byte[] payload,
- String contentType)
- throws IOException, AuthenticationException {
- HttpURLConnection conn = null;
- HttpOperationResponse outcome = new HttpOperationResponse();
- int resultCode;
- byte[] body = null;
- log.debug("{} {} spnego={}", verb, url, useSpnego);
-
- boolean doOutput = verb.hasUploadBody();
- if (doOutput) {
- Preconditions.checkArgument(payload !=null,
- "Null payload on a verb which expects one");
- }
- try {
- conn = openConnection(url);
- conn.setRequestMethod(verb.getVerb());
- conn.setDoOutput(doOutput);
- if (doOutput) {
- conn.setRequestProperty("Content-Type", contentType);
- }
-
- // now do the connection
- conn.connect();
-
- if (doOutput) {
- OutputStream output = conn.getOutputStream();
- IOUtils.write(payload, output);
- output.close();
- }
-
- resultCode = conn.getResponseCode();
- outcome.lastModified = conn.getLastModified();
- outcome.contentType = conn.getContentType();
- outcome.headers = conn.getHeaderFields();
- InputStream stream = conn.getErrorStream();
- if (stream == null) {
- stream = conn.getInputStream();
- }
- if (stream != null) {
- // read into a buffer.
- body = IOUtils.toByteArray(stream);
- } else {
- // no body:
- log.debug("No body in response");
-
- }
- } catch (SSLException e) {
- throw e;
- } catch (IOException e) {
- throw NetUtils.wrapException(url.toString(),
- url.getPort(), "localhost", 0, e);
-
- } catch (AuthenticationException e) {
- throw new AuthenticationException("From " + url + ": " + e, e);
-
- } finally {
- if (conn != null) {
- conn.disconnect();
- }
- }
- uprateFaults(HttpVerb.GET, url.toString(), resultCode, "", body);
- outcome.responseCode = resultCode;
- outcome.data = body;
- return outcome;
- }
-
- /**
- * Uprate error codes 400 and up into faults;
- * 404 is converted to a {@link NotFoundException},
- * 401 to {@link ForbiddenException}
- *
- * @param verb HTTP Verb used
- * @param url URL as string
- * @param resultCode response from the request
- * @param bodyAsString
- *@param body optional body of the request @throws IOException if the result was considered a failure
- */
- public static void uprateFaults(HttpVerb verb, String url,
- int resultCode, String bodyAsString, byte[] body)
- throws IOException {
-
- if (resultCode < 400) {
- //success
- return;
- }
- String msg = verb.toString() +" "+ url;
- if (resultCode == 404) {
- throw new NotFoundException(msg);
- }
- if (resultCode == 401) {
- throw new ForbiddenException(msg);
- }
- // all other error codes
-
- // get a string respnse
- if (bodyAsString == null) {
- if (body != null && body.length > 0) {
- bodyAsString = new String(body);
- } else {
- bodyAsString = "";
- }
- }
- String message = msg +
- " failed with exit code " + resultCode
- + ", body length " + bodyAsString.length()
- + ":\n" + bodyAsString;
- log.error(message);
- throw new IOException(message);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java b/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
deleted file mode 100644
index ca49888..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/zk/BlockingZKWatcher.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.zk;
-
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class BlockingZKWatcher implements Watcher {
-
- protected static final Logger log =
- LoggerFactory.getLogger(BlockingZKWatcher.class);
- private final AtomicBoolean connectedFlag = new AtomicBoolean(false);
-
- @Override
- public void process(WatchedEvent event) {
- log.info("ZK binding callback received");
- connectedFlag.set(true);
- synchronized (connectedFlag) {
- try {
- connectedFlag.notify();
- } catch (Exception e) {
- log.warn("failed while waiting for notification", e);
- }
- }
- }
-
- /**
- * Wait for a flag to go true
- * @param timeout timeout in millis
- */
-
- public void waitForZKConnection(int timeout)
- throws InterruptedException, ConnectException {
- synchronized (connectedFlag) {
- if (!connectedFlag.get()) {
- log.info("waiting for ZK event");
- //wait a bit
- connectedFlag.wait(timeout);
- }
- }
- if (!connectedFlag.get()) {
- throw new ConnectException("Unable to connect to ZK quorum");
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java b/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
deleted file mode 100644
index c8b3adb..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/zk/MiniZooKeeperCluster.java
+++ /dev/null
@@ -1,423 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.zk;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
-
-/**
- * This is a version of the HBase ZK cluster cut out to be standalone.
- *
- * <i>Important: keep this Java6 language level for now</i>
- */
-public class MiniZooKeeperCluster extends AbstractService {
- private static final Logger LOG = LoggerFactory.getLogger(
- MiniZooKeeperCluster.class);
-
- private static final int TICK_TIME = 2000;
- private static final int CONNECTION_TIMEOUT = 30000;
- public static final int MAX_CLIENT_CONNECTIONS = 1000;
-
- private boolean started;
-
- /** The default port. If zero, we use a random port. */
- private int defaultClientPort = 0;
-
- private int clientPort;
-
- private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
- private final List<ZooKeeperServer> zooKeeperServers;
- private final List<Integer> clientPortList;
-
- private int activeZKServerIndex;
- private int tickTime = 0;
- private File baseDir;
- private final int numZooKeeperServers;
- private String zkQuorum = "";
-
- public MiniZooKeeperCluster(int numZooKeeperServers) {
- super("MiniZooKeeperCluster");
- this.numZooKeeperServers = numZooKeeperServers;
- this.started = false;
- activeZKServerIndex = -1;
- zooKeeperServers = new ArrayList<ZooKeeperServer>();
- clientPortList = new ArrayList<Integer>();
- standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
- }
-
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- super.serviceInit(conf);
- }
-
- public void setDefaultClientPort(int clientPort) {
- if (clientPort <= 0) {
- throw new IllegalArgumentException("Invalid default ZK client port: "
- + clientPort);
- }
- this.defaultClientPort = clientPort;
- }
-
- /**
- * Selects a ZK client port. Returns the default port if specified.
- * Otherwise, returns a random port. The random port is selected from the
- * range between 49152 to 65535. These ports cannot be registered with IANA
- * and are intended for dynamic allocation (see http://bit.ly/dynports).
- */
- private int selectClientPort(Random r) {
- if (defaultClientPort > 0) {
- return defaultClientPort;
- }
- return 0xc000 + r.nextInt(0x3f00);
- }
-
- public void setTickTime(int tickTime) {
- this.tickTime = tickTime;
- }
-
- public int getBackupZooKeeperServerNum() {
- return zooKeeperServers.size() - 1;
- }
-
- public int getZooKeeperServerNum() {
- return zooKeeperServers.size();
- }
-
- // / XXX: From o.a.zk.t.ClientBase
- private static void setupTestEnv() {
- // during the tests we run with 100K prealloc in the logs.
- // on windows systems prealloc of 64M was seen to take ~15seconds
- // resulting in test failure (client timeout on first session).
- // set env and directly in order to handle static init/gc issues
- System.setProperty("zookeeper.preAllocSize", "100");
- FileTxnLog.setPreallocSize(100 * 1024);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- startup();
- }
-
- /**
- * @param baseDir
- * @param numZooKeeperServers
- * @return ClientPort server bound to, -1 if there was a
- * binding problem and we couldn't pick another port.
- * @throws IOException
- * @throws InterruptedException
- */
- private int startup() throws IOException,
- InterruptedException {
- if (numZooKeeperServers <= 0)
- return -1;
-
- setupTestEnv();
- started = true;
- baseDir = File.createTempFile("zookeeper", ".dir");
- recreateDir(baseDir);
-
- StringBuilder quorumList = new StringBuilder();
- Random rnd = new Random();
- int tentativePort = selectClientPort(rnd);
-
- // running all the ZK servers
- for (int i = 0; i < numZooKeeperServers; i++) {
- File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
- recreateDir(dir);
- int tickTimeToUse;
- if (this.tickTime > 0) {
- tickTimeToUse = this.tickTime;
- } else {
- tickTimeToUse = TICK_TIME;
- }
- ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
- NIOServerCnxnFactory standaloneServerFactory;
- while (true) {
- try {
- standaloneServerFactory = new NIOServerCnxnFactory();
- standaloneServerFactory.configure(
- new InetSocketAddress(tentativePort),
- MAX_CLIENT_CONNECTIONS
- );
- } catch (BindException e) {
- LOG.debug("Failed binding ZK Server to client port: " +
- tentativePort, e);
- // We're told to use some port but it's occupied, fail
- if (defaultClientPort > 0) return -1;
- // This port is already in use, try to use another.
- tentativePort = selectClientPort(rnd);
- continue;
- }
- break;
- }
-
- // Start up this ZK server
- standaloneServerFactory.startup(server);
- if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
- throw new IOException("Waiting for startup of standalone server");
- }
-
- // We have selected this port as a client port.
- clientPortList.add(tentativePort);
- standaloneServerFactoryList.add(standaloneServerFactory);
- zooKeeperServers.add(server);
- if (quorumList.length() > 0) {
- quorumList.append(",");
- }
- quorumList.append("localhost:").append(tentativePort);
- tentativePort++; //for the next server
- }
-
- // set the first one to be active ZK; Others are backups
- activeZKServerIndex = 0;
-
- clientPort = clientPortList.get(activeZKServerIndex);
- zkQuorum = quorumList.toString();
- LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
- "on client port: " + clientPort);
- return clientPort;
- }
-
- private void recreateDir(File dir) throws IOException {
- if (dir.exists()) {
- if (!FileUtil.fullyDelete(dir)) {
- throw new IOException("Could not delete zk base directory: " + dir);
- }
- }
- try {
- dir.mkdirs();
- } catch (SecurityException e) {
- throw new IOException("creating dir: " + dir, e);
- }
- }
-
- /**
- * Delete the basedir
- */
- private void deleteBaseDir() {
- if (baseDir != null) {
- baseDir.delete();
- baseDir = null;
- }
-
- }
-
- @Override
- protected void serviceStop() throws Exception {
-
- if (!started) {
- return;
- }
- started = false;
-
- try {
- // shut down all the zk servers
- for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
- NIOServerCnxnFactory standaloneServerFactory =
- standaloneServerFactoryList.get(i);
- int clientPort = clientPortList.get(i);
-
- standaloneServerFactory.shutdown();
- if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
- throw new IOException("Waiting for shutdown of standalone server");
- }
- }
- for (ZooKeeperServer zkServer : zooKeeperServers) {
- //explicitly close ZKDatabase since ZookeeperServer does not close them
- zkServer.getZKDatabase().close();
- }
- } finally {
- // clear everything
- activeZKServerIndex = 0;
- standaloneServerFactoryList.clear();
- clientPortList.clear();
- zooKeeperServers.clear();
- }
-
- LOG.info("Shutdown MiniZK cluster with all ZK servers");
- }
-
- /**@return clientPort return clientPort if there is another ZK backup can run
- * when killing the current active; return -1, if there is no backups.
- * @throws IOException
- * @throws InterruptedException
- */
- public int killCurrentActiveZooKeeperServer() throws IOException,
- InterruptedException {
- if (!started || activeZKServerIndex < 0) {
- return -1;
- }
-
- // Shutdown the current active one
- NIOServerCnxnFactory standaloneServerFactory =
- standaloneServerFactoryList.get(activeZKServerIndex);
- int clientPort = clientPortList.get(activeZKServerIndex);
-
- standaloneServerFactory.shutdown();
- if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
- throw new IOException("Waiting for shutdown of standalone server");
- }
-
- zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
-
- // remove the current active zk server
- standaloneServerFactoryList.remove(activeZKServerIndex);
- clientPortList.remove(activeZKServerIndex);
- zooKeeperServers.remove(activeZKServerIndex);
- LOG.info("Kill the current active ZK servers in the cluster " +
- "on client port: " + clientPort);
-
- if (standaloneServerFactoryList.size() == 0) {
- // there is no backup servers;
- return -1;
- }
- clientPort = clientPortList.get(activeZKServerIndex);
- LOG.info("Activate a backup zk server in the cluster " +
- "on client port: " + clientPort);
- // return the next back zk server's port
- return clientPort;
- }
-
- /**
- * Kill one back up ZK servers
- * @throws IOException
- * @throws InterruptedException
- */
- public void killOneBackupZooKeeperServer() throws IOException,
- InterruptedException {
- if (!started || activeZKServerIndex < 0 ||
- standaloneServerFactoryList.size() <= 1) {
- return;
- }
-
- int backupZKServerIndex = activeZKServerIndex + 1;
- // Shutdown the current active one
- NIOServerCnxnFactory standaloneServerFactory =
- standaloneServerFactoryList.get(backupZKServerIndex);
- int clientPort = clientPortList.get(backupZKServerIndex);
-
- standaloneServerFactory.shutdown();
- if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
- throw new IOException("Waiting for shutdown of standalone server");
- }
-
- zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
-
- // remove this backup zk server
- standaloneServerFactoryList.remove(backupZKServerIndex);
- clientPortList.remove(backupZKServerIndex);
- zooKeeperServers.remove(backupZKServerIndex);
- LOG.info("Kill one backup ZK servers in the cluster " +
- "on client port: " + clientPort);
- }
-
- // XXX: From o.a.zk.t.ClientBase
- private static boolean waitForServerDown(int port, long timeout) throws
- InterruptedException {
- long start = System.currentTimeMillis();
- while (true) {
- try {
- Socket sock = null;
- try {
- sock = new Socket("localhost", port);
- OutputStream outstream = sock.getOutputStream();
- outstream.write("stat".getBytes());
- outstream.flush();
- } finally {
- IOUtils.closeSocket(sock);
- }
- } catch (IOException e) {
- return true;
- }
-
- if (System.currentTimeMillis() > start + timeout) {
- break;
- }
- Thread.sleep(250);
- }
- return false;
- }
-
- // XXX: From o.a.zk.t.ClientBase
- private static boolean waitForServerUp(int port, long timeout) throws
- InterruptedException {
- long start = System.currentTimeMillis();
- while (true) {
- try {
- Socket sock = null;
- sock = new Socket("localhost", port);
- BufferedReader reader = null;
- try {
- OutputStream outstream = sock.getOutputStream();
- outstream.write("stat".getBytes());
- outstream.flush();
-
- Reader isr = new InputStreamReader(sock.getInputStream());
- reader = new BufferedReader(isr);
- String line = reader.readLine();
- if (line != null && line.startsWith("Zookeeper version:")) {
- return true;
- }
- } finally {
- IOUtils.closeSocket(sock);
- IOUtils.closeStream(reader);
- }
- } catch (IOException e) {
- // ignore as this is expected
- LOG.debug("server localhost:" + port + " not up " + e);
- }
-
- if (System.currentTimeMillis() > start + timeout) {
- break;
- }
- Thread.sleep(250);
- }
- return false;
- }
-
- public int getClientPort() {
- return clientPort;
- }
-
- public String getZkQuorum() {
- return zkQuorum;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
deleted file mode 100644
index 045b72c..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKCallback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.zk;
-
-import org.apache.zookeeper.Watcher;
-
-/**
- * Relays ZK watcher events to a closure
- */
-public abstract class ZKCallback implements Watcher {
-
- public ZKCallback() {
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
deleted file mode 100644
index ca41e4b..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.zk;
-
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.data.ACL;
-import org.apache.zookeeper.data.Stat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-
-public class ZKIntegration implements Watcher, Closeable {
-
-/**
- * Base path for services
- */
- public static String ZK_SERVICES = "services";
- /**
- * Base path for all Slider references
- */
- public static String ZK_SLIDER = "slider";
- public static String ZK_USERS = "users";
- public static String SVC_SLIDER = "/" + ZK_SERVICES + "/" + ZK_SLIDER;
- public static String SVC_SLIDER_USERS = SVC_SLIDER + "/" + ZK_USERS;
-
- public static final List<String> ZK_USERS_PATH_LIST = new ArrayList<String>();
- static {
- ZK_USERS_PATH_LIST.add(ZK_SERVICES);
- ZK_USERS_PATH_LIST.add(ZK_SLIDER);
- ZK_USERS_PATH_LIST.add(ZK_USERS);
- }
-
- public static int SESSION_TIMEOUT = 30000;
- protected static final Logger log =
- LoggerFactory.getLogger(ZKIntegration.class);
- private ZooKeeper zookeeper;
- private final String username;
- private final String clustername;
- private final String userPath;
- private int sessionTimeout = SESSION_TIMEOUT;
-/**
- flag to set to indicate that the user path should be created if
- it is not already there
- */
- private final AtomicBoolean toInit = new AtomicBoolean(false);
- private final boolean createClusterPath;
- private final Watcher watchEventHandler;
- private final String zkConnection;
- private final boolean canBeReadOnly;
-
- protected ZKIntegration(String zkConnection,
- String username,
- String clustername,
- boolean canBeReadOnly,
- boolean createClusterPath,
- Watcher watchEventHandler,
- int sessionTimeout
- ) throws IOException {
- this.username = username;
- this.clustername = clustername;
- this.watchEventHandler = watchEventHandler;
- this.zkConnection = zkConnection;
- this.canBeReadOnly = canBeReadOnly;
- this.createClusterPath = createClusterPath;
- this.sessionTimeout = sessionTimeout;
- this.userPath = mkSliderUserPath(username);
- }
-
- public void init() throws IOException {
- assert zookeeper == null;
- log.debug("Binding ZK client to {}", zkConnection);
- zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
- }
-
- /**
- * Create an instance bonded to the specific closure
- * @param zkConnection
- * @param username
- * @param clustername
- * @param canBeReadOnly
- * @param watchEventHandler
- * @return the new instance
- * @throws IOException
- */
- public static ZKIntegration newInstance(String zkConnection,
- String username,
- String clustername,
- boolean createClusterPath,
- boolean canBeReadOnly,
- Watcher watchEventHandler,
- int sessionTimeout) throws IOException {
-
- return new ZKIntegration(zkConnection,
- username,
- clustername,
- canBeReadOnly,
- createClusterPath,
- watchEventHandler,
- sessionTimeout);
- }
-
-
- @Override
- public synchronized void close() throws IOException {
- if (zookeeper != null) {
- try {
- zookeeper.close();
- } catch (InterruptedException ignored) {
-
- }
- zookeeper = null;
- }
- }
-
- public String getConnectionString() {
- return zkConnection;
- }
-
- public String getClusterPath() {
- return mkClusterPath(username, clustername);
- }
-
- public boolean getConnected() {
- return zookeeper.getState().isConnected();
- }
-
- public boolean getAlive() {
- return zookeeper.getState().isAlive();
- }
-
- public ZooKeeper.States getState() {
- return zookeeper.getState();
- }
-
- public Stat getClusterStat() throws KeeperException, InterruptedException {
- return stat(getClusterPath());
- }
-
- public boolean exists(String path) throws
- KeeperException,
- InterruptedException {
- return stat(path) != null;
- }
-
- public Stat stat(String path) throws KeeperException, InterruptedException {
- return zookeeper.exists(path, false);
- }
-
- @Override
- public String toString() {
- return "ZK integration bound @ " + zkConnection + ": " + zookeeper;
- }
-
-/**
- * Event handler to notify of state events
- * @param event
- */
- @Override
- public void process(WatchedEvent event) {
- log.debug("{}", event);
- try {
- maybeInit();
- } catch (Exception e) {
- log.error("Failed to init", e);
- }
- if (watchEventHandler != null) {
- watchEventHandler.process(event);
- }
- }
-
- private void maybeInit() throws KeeperException, InterruptedException {
- if (!toInit.getAndSet(true) && createClusterPath) {
- log.debug("initing");
- //create the user path
- mkPath(ZK_USERS_PATH_LIST, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- //create the specific user
- createPath(userPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- }
- }
-
- /**
- * Create a path under a parent, don't care if it already exists
- * As the path isn't returned, this isn't the way to create sequentially
- * numbered nodes.
- * @param parent parent dir. Must have a trailing / if entry!=null||empty
- * @param entry entry -can be null or "", in which case it is not appended
- * @param acl
- * @param createMode
- * @return the path if created; null if not
- */
- public String createPath(String parent,
- String entry,
- List<ACL> acl,
- CreateMode createMode) throws KeeperException, InterruptedException {
- //initial create of full path
- assert acl != null;
- assert !acl.isEmpty();
- assert parent != null;
- String path = parent;
- if (entry != null) {
- path = path + entry;
- }
- try {
- log.debug("Creating ZK path {}", path);
- return zookeeper.create(path, null, acl, createMode);
- } catch (KeeperException.NodeExistsException ignored) {
- //node already there
- log.debug("node already present:{}",path);
- return null;
- }
- }
-
- /**
- * Recursive path create
- * @param paths path list
- * @param acl acl list
- * @param createMode create modes
- */
- public void mkPath(List<String> paths,
- List<ACL> acl,
- CreateMode createMode) throws KeeperException, InterruptedException {
- String history = "/";
- for (String entry : paths) {
- createPath(history, entry, acl, createMode);
- history = history + entry + "/";
- }
- }
-
-/**
- * Blocking enum of users
- * @return an unordered list of clusters under a user
- */
- public List<String> getClusters() throws KeeperException, InterruptedException {
- return zookeeper.getChildren(userPath, null);
- }
-
- /**
- * Delete a node, does not throw an exception if the path is not fond
- * @param path path to delete
- * @return true if the path could be deleted, false if there was no node to delete
- *
- */
- public boolean delete(String path) throws
- InterruptedException,
- KeeperException {
- try {
- zookeeper.delete(path, -1);
- log.debug("Deleting {}", path);
- return true;
- } catch (KeeperException.NoNodeException ignored) {
- return false;
- }
- }
-
- /**
- * Recursively delete a node, does not throw exception if any node does not exist.
- * @param path
- * @return true if delete was successful
- */
- public boolean deleteRecursive(String path) throws KeeperException, InterruptedException {
-
- try {
- List<String> children = zookeeper.getChildren(path, false);
- for (String child : children) {
- deleteRecursive(path + "/" + child);
- }
- delete(path);
- } catch (KeeperException.NoNodeException ignored) {
- return false;
- }
-
- return true;
- }
-
- /**
- * Build the path to a cluster; exists once the cluster has come up.
- * Even before that, a ZK watcher could wait for it.
- * @param username user
- * @param clustername name of the cluster
- * @return a strin
- */
- public static String mkClusterPath(String username, String clustername) {
- return mkSliderUserPath(username) + "/" + clustername;
- }
-/**
- * Build the path to a cluster; exists once the cluster has come up.
- * Even before that, a ZK watcher could wait for it.
- * @param username user
- * @return a string
- */
- public static String mkSliderUserPath(String username) {
- return SVC_SLIDER_USERS + "/" + username;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
deleted file mode 100644
index b088568..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZKPathBuilder.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.zk;
-
-import java.util.Locale;
-
-public final class ZKPathBuilder {
-
- private final String username, appname, clustername;
- private final String quorum;
-
- private String appPath;
- private String registryPath;
- private final String appQuorum;
-
- public ZKPathBuilder(String username,
- String appname,
- String clustername,
- String quorum,
- String appQuorum) {
- this.username = username;
- this.appname = appname;
- this.clustername = clustername;
- this.quorum = quorum;
- appPath = buildAppPath();
- registryPath = buildRegistryPath();
- this.appQuorum = appQuorum;
- }
-
- public String buildAppPath() {
- return String.format(Locale.ENGLISH, "/yarnapps_%s_%s_%s", appname,
- username, clustername);
-
- }
-
- public String buildRegistryPath() {
- return String.format(Locale.ENGLISH, "/services_%s_%s_%s", appname,
- username, clustername);
-
- }
-
- public String getQuorum() {
- return quorum;
- }
-
- public String getAppQuorum() {
- return appQuorum;
- }
-
- public String getAppPath() {
- return appPath;
- }
-
- public void setAppPath(String appPath) {
- this.appPath = appPath;
- }
-
- public String getRegistryPath() {
- return registryPath;
- }
-
- public void setRegistryPath(String registryPath) {
- this.registryPath = registryPath;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java b/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
deleted file mode 100644
index cc1b2c9..0000000
--- a/slider-core/src/main/java/org/apache/slider/core/zk/ZookeeperUtils.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.core.zk;
-
-import com.google.common.net.HostAndPort;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.BadConfigException;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class ZookeeperUtils {
- public static final int DEFAULT_PORT = 2181;
-
- public static String buildConnectionString(String zkHosts, int port) {
- String zkPort = Integer.toString(port);
- //parse the hosts
- String[] hostlist = zkHosts.split(",", 0);
- String quorum = SliderUtils.join(hostlist, ":" + zkPort + ",", false);
- return quorum;
- }
-
- /**
- * Take a quorum list and split it to (trimmed) pairs
- * @param hostPortQuorumList list of form h1:port, h2:port2,...
- * @return a possibly empty list of values between commas. They may not be
- * valid hostname:port pairs
- */
- public static List<String> splitToPairs(String hostPortQuorumList) {
- // split an address hot
- String[] strings = StringUtils.getStrings(hostPortQuorumList);
- int len = 0;
- if (strings != null) {
- len = strings.length;
- }
- List<String> tuples = new ArrayList<String>(len);
- if (strings != null) {
- for (String s : strings) {
- tuples.add(s.trim());
- }
- }
- return tuples;
- }
-
- /**
- * Split a quorum list into a list of hostnames and ports
- * @param hostPortQuorumList split to a list of hosts and ports
- * @return a list of values
- */
- public static List<HostAndPort> splitToHostsAndPorts(String hostPortQuorumList) {
- // split an address hot
- String[] strings = StringUtils.getStrings(hostPortQuorumList);
- int len = 0;
- if (strings != null) {
- len = strings.length;
- }
- List<HostAndPort> list = new ArrayList<HostAndPort>(len);
- if (strings != null) {
- for (String s : strings) {
- list.add(HostAndPort.fromString(s.trim()).withDefaultPort(DEFAULT_PORT));
- }
- }
- return list;
- }
-
- /**
- * Build up to a hosts only list
- * @param hostAndPorts
- * @return a list of the hosts only
- */
- public static String buildHostsOnlyList(List<HostAndPort> hostAndPorts) {
- StringBuilder sb = new StringBuilder();
- for (HostAndPort hostAndPort : hostAndPorts) {
- sb.append(hostAndPort.getHostText()).append(",");
- }
- if (sb.length() > 0) {
- sb.delete(sb.length() - 1, sb.length());
- }
- return sb.toString();
- }
-
- public static String buildQuorumEntry(HostAndPort hostAndPort,
- int defaultPort) {
- String s = hostAndPort.toString();
- if (hostAndPort.hasPort()) {
- return s;
- } else {
- return s + ":" + defaultPort;
- }
- }
-
- /**
- * Build a quorum list, injecting a ":defaultPort" ref if needed on
- * any entry without one
- * @param hostAndPorts
- * @param defaultPort
- * @return
- */
- public static String buildQuorum(List<HostAndPort> hostAndPorts, int defaultPort) {
- List<String> entries = new ArrayList<String>(hostAndPorts.size());
- for (HostAndPort hostAndPort : hostAndPorts) {
- entries.add(buildQuorumEntry(hostAndPort, defaultPort));
- }
- return SliderUtils.join(entries, ",", false);
- }
-
- public static String convertToHostsOnlyList(String quorum) throws
- BadConfigException {
- List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
- return ZookeeperUtils.buildHostsOnlyList(hostAndPorts);
- }
-
- public static List<HostAndPort> splitToHostsAndPortsStrictly(String quorum) throws
- BadConfigException {
- List<HostAndPort> hostAndPorts =
- ZookeeperUtils.splitToHostsAndPorts(quorum);
- if (hostAndPorts.isEmpty()) {
- throw new BadConfigException("empty zookeeper quorum");
- }
- return hostAndPorts;
- }
-
- public static int getFirstPort(String quorum, int defVal) throws
- BadConfigException {
- List<HostAndPort> hostAndPorts = splitToHostsAndPortsStrictly(quorum);
- int port = hostAndPorts.get(0).getPortOrDefault(defVal);
- return port;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
deleted file mode 100644
index 510de5d..0000000
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractClientProvider.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.providers;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.registry.client.api.RegistryOperations;
-import org.apache.slider.common.tools.SliderFileSystem;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.conf.ConfTreeOperations;
-import org.apache.slider.core.conf.MapOperations;
-import org.apache.slider.core.exceptions.BadClusterStateException;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.launch.AbstractLauncher;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.slider.api.ResourceKeys.COMPONENT_INSTANCES;
-import static org.apache.slider.api.ResourceKeys.DEF_YARN_CORES;
-import static org.apache.slider.api.ResourceKeys.DEF_YARN_MEMORY;
-import static org.apache.slider.api.ResourceKeys.YARN_CORES;
-import static org.apache.slider.api.ResourceKeys.YARN_MEMORY;
-
-public abstract class AbstractClientProvider extends Configured {
- private static final Logger log =
- LoggerFactory.getLogger(AbstractClientProvider.class);
- protected static final ProviderUtils providerUtils =
- new ProviderUtils(log);
-
- public static final String PROVIDER_RESOURCE_BASE =
- "org/apache/slider/providers/";
- public static final String PROVIDER_RESOURCE_BASE_ROOT =
- "/" + PROVIDER_RESOURCE_BASE;
-
- public AbstractClientProvider(Configuration conf) {
- super(conf);
- }
-
- public abstract String getName();
-
- public abstract List<ProviderRole> getRoles();
-
- /**
- * Verify that an instance definition is considered valid by the provider
- * @param instanceDefinition instance definition
- * @throws SliderException if the configuration is not valid
- */
- public void validateInstanceDefinition(AggregateConf instanceDefinition, SliderFileSystem fs) throws
- SliderException {
-
- List<ProviderRole> roles = getRoles();
- ConfTreeOperations resources =
- instanceDefinition.getResourceOperations();
- for (ProviderRole role : roles) {
- String name = role.name;
- MapOperations component = resources.getComponent(role.group);
- if (component != null) {
- String instances = component.get(COMPONENT_INSTANCES);
- if (instances == null) {
- String message = "No instance count provided for " + name;
- log.error("{} with \n{}", message, resources.toString());
- throw new BadClusterStateException(message);
- }
- String ram = component.get(YARN_MEMORY);
- String cores = component.get(YARN_CORES);
-
-
- providerUtils.getRoleResourceRequirement(ram,
- DEF_YARN_MEMORY,
- Integer.MAX_VALUE);
- providerUtils.getRoleResourceRequirement(cores,
- DEF_YARN_CORES,
- Integer.MAX_VALUE);
- }
- }
- }
-
-
- /**
- * Any provider-side alteration of a configuration can take place here.
- * @param aggregateConf config to patch
- * @throws IOException IO problems
- * @throws SliderException Slider-specific issues
- */
- public void prepareInstanceConfiguration(AggregateConf aggregateConf) throws
- SliderException,
- IOException {
- //default: do nothing
- }
-
-
- /**
- * Prepare the AM settings for launch
- * @param fileSystem filesystem
- * @param serviceConf configuration of the client
- * @param launcher launcher to set up
- * @param instanceDescription instance description being launched
- * @param snapshotConfDirPath
- * @param generatedConfDirPath
- * @param clientConfExtras
- * @param libdir
- * @param tempPath
- * @param miniClusterTestRun flag set to true on a mini cluster run
- * @throws IOException
- * @throws SliderException
- */
- public void prepareAMAndConfigForLaunch(SliderFileSystem fileSystem,
- Configuration serviceConf,
- AbstractLauncher launcher,
- AggregateConf instanceDescription,
- Path snapshotConfDirPath,
- Path generatedConfDirPath,
- Configuration clientConfExtras,
- String libdir,
- Path tempPath,
- boolean miniClusterTestRun)
- throws IOException, SliderException {
-
- }
-
- /**
- * Load in and merge in templates. Null arguments means "no such template"
- * @param instanceConf instance to patch
- * @param internalTemplate patch to internal.json
- * @param resourceTemplate path to resources.json
- * @param appConfTemplate path to app_conf.json
- * @throws IOException any IO problems
- */
- protected void mergeTemplates(AggregateConf instanceConf,
- String internalTemplate,
- String resourceTemplate,
- String appConfTemplate) throws IOException {
- if (internalTemplate != null) {
- ConfTreeOperations template =
- ConfTreeOperations.fromResource(internalTemplate);
- instanceConf.getInternalOperations()
- .mergeWithoutOverwrite(template.confTree);
- }
-
- if (resourceTemplate != null) {
- ConfTreeOperations resTemplate =
- ConfTreeOperations.fromResource(resourceTemplate);
- instanceConf.getResourceOperations()
- .mergeWithoutOverwrite(resTemplate.confTree);
- }
-
- if (appConfTemplate != null) {
- ConfTreeOperations template =
- ConfTreeOperations.fromResource(appConfTemplate);
- instanceConf.getAppConfOperations()
- .mergeWithoutOverwrite(template.confTree);
- }
-
- }
-
- /**
- * This is called pre-launch to validate that the cluster specification
- * is valid. This can include checking that the security options
- * are in the site files prior to launch, that there are no conflicting operations
- * etc.
- *
- * This check is made prior to every launch of the cluster -so can
- * pick up problems which manually edited cluster files have added,
- * or from specification files from previous versions.
- *
- * The provider MUST NOT change the remote specification. This is
- * purely a pre-launch validation of options.
- *
- *
- * @param sliderFileSystem filesystem
- * @param clustername name of the cluster
- * @param configuration cluster configuration
- * @param instanceDefinition cluster specification
- * @param clusterDirPath directory of the cluster
- * @param generatedConfDirPath path to place generated artifacts
- * @param secure flag to indicate that the cluster is secure
- * @throws SliderException on any validation issue
- * @throws IOException on any IO problem
- */
- public void preflightValidateClusterConfiguration(SliderFileSystem sliderFileSystem,
- String clustername,
- Configuration configuration,
- AggregateConf instanceDefinition,
- Path clusterDirPath,
- Path generatedConfDirPath,
- boolean secure)
- throws SliderException, IOException {
- validateInstanceDefinition(instanceDefinition, sliderFileSystem);
- }
-
- /**
- * Return a set of application specific string tags.
- * @return the set of tags.
- */
- public Set<String> getApplicationTags (SliderFileSystem fileSystem,
- String appDef) throws SliderException {
- return Collections.emptySet();
- }
-
- /**
- * Process client operations for applications such as install, configure
- * @param fileSystem
- * @param registryOperations
- * @param configuration
- * @param operation
- * @param clientInstallPath
- * @param clientPackage
- * @param clientConfig
- * @param name
- * @throws SliderException
- */
- public void processClientOperation(SliderFileSystem fileSystem,
- RegistryOperations registryOperations,
- Configuration configuration,
- String operation,
- File clientInstallPath,
- File clientPackage,
- JSONObject clientConfig,
- String name)
- throws SliderException {
- throw new SliderException("Provider does not support client operations.");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
deleted file mode 100644
index 92766f5..0000000
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.providers;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
-import org.apache.hadoop.registry.client.exceptions.InvalidRecordException;
-import org.apache.hadoop.registry.client.types.AddressTypes;
-import org.apache.hadoop.registry.client.types.Endpoint;
-import org.apache.hadoop.registry.client.types.ServiceRecord;
-import org.apache.slider.api.ClusterDescription;
-import org.apache.slider.common.SliderKeys;
-import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.common.tools.SliderFileSystem;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.conf.AggregateConf;
-import org.apache.slider.core.exceptions.BadCommandArgumentsException;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.main.ExitCodeProvider;
-import org.apache.slider.server.appmaster.actions.QueueAccess;
-import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
-import org.apache.slider.server.appmaster.state.ContainerReleaseSelector;
-import org.apache.slider.server.appmaster.state.MostRecentContainerReleaseSelector;
-import org.apache.slider.server.appmaster.state.StateAccessForProviders;
-import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
-import org.apache.slider.server.services.workflow.ForkedProcessService;
-import org.apache.slider.server.services.workflow.ServiceParent;
-import org.apache.slider.server.services.workflow.WorkflowSequenceService;
-import org.apache.slider.server.services.yarnregistry.YarnRegistryViewForProviders;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-/**
- * The base class for provider services. It lets the implementations
- * add sequences of operations, and propagates service failures
- * upstream
- */
-public abstract class AbstractProviderService
- extends WorkflowSequenceService
- implements
- ProviderCore,
- SliderKeys,
- ProviderService {
- private static final Logger log =
- LoggerFactory.getLogger(AbstractProviderService.class);
- protected StateAccessForProviders amState;
- protected AgentRestOperations restOps;
- protected URL amWebAPI;
- protected YarnRegistryViewForProviders yarnRegistry;
- protected QueueAccess queueAccess;
-
- protected AbstractProviderService(String name) {
- super(name);
- setStopIfNoChildServicesAtStartup(false);
- }
-
- @Override
- public Configuration getConf() {
- return getConfig();
- }
-
- public StateAccessForProviders getAmState() {
- return amState;
- }
-
- public QueueAccess getQueueAccess() {
- return queueAccess;
- }
-
- public void setAmState(StateAccessForProviders amState) {
- this.amState = amState;
- }
-
- @Override
- public String getHumanName() {
- return getName().toLowerCase(Locale.ENGLISH);
- }
-
- @Override
- public void bind(StateAccessForProviders stateAccessor,
- QueueAccess queueAccess,
- List<Container> liveContainers) {
- this.amState = stateAccessor;
- this.queueAccess = queueAccess;
- }
-
- @Override
- public void bindToYarnRegistry(YarnRegistryViewForProviders yarnRegistry) {
- this.yarnRegistry = yarnRegistry;
- }
-
- public YarnRegistryViewForProviders getYarnRegistry() {
- return yarnRegistry;
- }
-
- @Override
- public AgentRestOperations getAgentRestOperations() {
- return restOps;
- }
-
- @Override
- public void notifyContainerCompleted(ContainerId containerId) {
- }
-
- public void setAgentRestOperations(AgentRestOperations agentRestOperations) {
- this.restOps = agentRestOperations;
- }
-
- /**
- * Load a specific XML configuration file for the provider config
- * @param confDir configuration directory
- * @param siteXMLFilename provider-specific filename
- * @return a configuration to be included in status
- * @throws BadCommandArgumentsException argument problems
- * @throws IOException IO problems
- */
- protected Configuration loadProviderConfigurationInformation(File confDir,
- String siteXMLFilename)
- throws BadCommandArgumentsException, IOException {
- Configuration siteConf;
- File siteXML = new File(confDir, siteXMLFilename);
- if (!siteXML.exists()) {
- throw new BadCommandArgumentsException(
- "Configuration directory %s doesn't contain %s - listing is %s",
- confDir, siteXMLFilename, SliderUtils.listDir(confDir));
- }
-
- //now read it in
- siteConf = ConfigHelper.loadConfFromFile(siteXML);
- log.info("{} file is at {}", siteXMLFilename, siteXML);
- log.info(ConfigHelper.dumpConfigToString(siteConf));
- return siteConf;
- }
-
- /**
- * No-op implementation of this method.
- */
- @Override
- public void initializeApplicationConfiguration(
- AggregateConf instanceDefinition, SliderFileSystem fileSystem,
- String roleGroup)
- throws IOException, SliderException {
- }
-
- /**
- * No-op implementation of this method.
- *
- * {@inheritDoc}
- */
- @Override
- public void validateApplicationConfiguration(AggregateConf instance,
- File confDir,
- boolean secure)
- throws IOException, SliderException {
-
- }
-
- /**
- * Scan through the roles and see if it is supported.
- * @param role role to look for
- * @return true if the role is known about -and therefore
- * that a launcher thread can be deployed to launch it
- */
- @Override
- public boolean isSupportedRole(String role) {
- Collection<ProviderRole> roles = getRoles();
- for (ProviderRole providedRole : roles) {
- if (providedRole.name.equals(role)) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * override point to allow a process to start executing in this container
- * @param instanceDefinition cluster description
- * @param confDir configuration directory
- * @param env environment
- * @param execInProgress the callback for the exec events
- * @return false
- * @throws IOException
- * @throws SliderException
- */
- @Override
- public boolean exec(AggregateConf instanceDefinition,
- File confDir,
- Map<String, String> env,
- ProviderCompleted execInProgress) throws IOException, SliderException {
- return false;
- }
-
- @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
- @Override // ExitCodeProvider
- public int getExitCode() {
- Throwable cause = getFailureCause();
- if (cause != null) {
- //failed for some reason
- if (cause instanceof ExitCodeProvider) {
- return ((ExitCodeProvider) cause).getExitCode();
- }
- }
- ForkedProcessService lastProc = latestProcess();
- if (lastProc == null || !lastProc.isProcessTerminated()) {
- return 0;
- } else {
- return lastProc.getExitCode();
- }
- }
-
- /**
- * Return the latest forked process service that ran
- * @return the forkes service
- */
- protected ForkedProcessService latestProcess() {
- Service current = getActiveService();
- Service prev = getPreviousService();
-
- Service latest = current != null ? current : prev;
- if (latest instanceof ForkedProcessService) {
- return (ForkedProcessService) latest;
- } else {
- //its a composite object, so look inside it for a process
- if (latest instanceof ServiceParent) {
- return getFPSFromParentService((ServiceParent) latest);
- } else {
- //no match
- return null;
- }
- }
- }
-
-
- /**
- * Given a parent service, find the one that is a forked process
- * @param serviceParent parent
- * @return the forked process service or null if there is none
- */
- protected ForkedProcessService getFPSFromParentService(ServiceParent serviceParent) {
- List<Service> services = serviceParent.getServices();
- for (Service s : services) {
- if (s instanceof ForkedProcessService) {
- return (ForkedProcessService) s;
- }
- }
- return null;
- }
-
- /**
- * if we are already running, start this service
- */
- protected void maybeStartCommandSequence() {
- if (isInState(STATE.STARTED)) {
- startNextService();
- }
- }
-
- /**
- * Create a new forked process service with the given
- * name, environment and command list -then add it as a child
- * for execution in the sequence.
- *
- * @param name command name
- * @param env environment
- * @param commands command line
- * @throws IOException
- * @throws SliderException
- */
- protected ForkedProcessService queueCommand(String name,
- Map<String, String> env,
- List<String> commands) throws
- IOException,
- SliderException {
- ForkedProcessService process = buildProcess(name, env, commands);
- //register the service for lifecycle management; when this service
- //is terminated, so is the master process
- addService(process);
- return process;
- }
-
- public ForkedProcessService buildProcess(String name,
- Map<String, String> env,
- List<String> commands) throws
- IOException,
- SliderException {
- ForkedProcessService process;
- process = new ForkedProcessService(name);
- process.init(getConfig());
- process.build(env, commands);
- return process;
- }
-
- /*
- * Build the provider status, can be empty
- * @return the provider status - map of entries to add to the info section
- */
- @Override
- public Map<String, String> buildProviderStatus() {
- return new HashMap<String, String>();
- }
-
- /*
- Build the monitor details. The base implementation includes all the external URL endpoints
- in the external view
- */
- @Override
- public Map<String, MonitorDetail> buildMonitorDetails(ClusterDescription clusterDesc) {
- Map<String, MonitorDetail> details = new LinkedHashMap<String, MonitorDetail>();
-
- // add in all the endpoints
- buildEndpointDetails(details);
-
- return details;
- }
-
- @Override
- public void buildEndpointDetails(Map<String, MonitorDetail> details) {
- ServiceRecord self = yarnRegistry.getSelfRegistration();
-
- List<Endpoint> externals = self.external;
- for (Endpoint endpoint : externals) {
- String addressType = endpoint.addressType;
- if (AddressTypes.ADDRESS_URI.equals(addressType)) {
- try {
- List<URL> urls = RegistryTypeUtils.retrieveAddressURLs(endpoint);
- if (!urls.isEmpty()) {
- details.put(endpoint.api, new MonitorDetail(urls.get(0).toString(), true));
- }
- } catch (InvalidRecordException | MalformedURLException ignored) {
- // Ignored
- }
-
- }
-
- }
- }
-
- @Override
- public void applyInitialRegistryDefinitions(URL amWebURI,
- URL agentOpsURI,
- URL agentStatusURI,
- ServiceRecord serviceRecord)
- throws IOException {
- this.amWebAPI = amWebURI;
- }
-
- /**
- * {@inheritDoc}
- *
- *
- * @return The base implementation returns the most recent containers first.
- */
- @Override
- public ContainerReleaseSelector createContainerReleaseSelector() {
- return new MostRecentContainerReleaseSelector();
- }
-
- @Override
- public void releaseAssignedContainer(ContainerId containerId) {
- // no-op
- }
-
- @Override
- public void addContainerRequest(AMRMClient.ContainerRequest req) {
- // no-op
- }
-
- @Override
- public void cancelSingleRequest(AMRMClient.ContainerRequest request) {
- // no-op
- }
-
- @Override
- public int cancelContainerRequests(Priority priority1,
- Priority priority2,
- int count) {
- return 0;
- }
-
- @Override
- public void execute(List<AbstractRMOperation> operations) {
- for (AbstractRMOperation operation : operations) {
- operation.execute(this);
- }
- }
- /**
- * No-op implementation of this method.
- */
- @Override
- public void rebuildContainerDetails(List<Container> liveContainers,
- String applicationId, Map<Integer, ProviderRole> providerRoles) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java b/slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
deleted file mode 100644
index 27d3415..0000000
--- a/slider-core/src/main/java/org/apache/slider/providers/MonitorDetail.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to you under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.slider.providers;
-
-/**
- * Details about some exported information from a provider to the AM web UI.
- */
-public class MonitorDetail {
-
- private final String value;
- private final boolean isUrl;
-
- public MonitorDetail(String value, boolean isUrl) {
- this.value = value;
- this.isUrl = isUrl;
- }
-
- public String getValue() {
- return value;
- }
-
- public boolean isUrl() {
- return isUrl;
- }
-
- public String toString() {
- return "MonitorDetail[" + value + " isUrl=" + isUrl + "]";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/51c2b92c/slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java b/slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
deleted file mode 100644
index 128dd5d..0000000
--- a/slider-core/src/main/java/org/apache/slider/providers/PlacementPolicy.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.providers;
-
-/**
- * Placement values.
- * This is nominally a bitmask, though not all values make sense
- */
-public class PlacementPolicy {
-
- /**
- * Default value: history used, anti-affinity hinted at on rebuild/flex up
- */
- public static final int NONE = 0;
-
- /**
- * Default value: history used, anti-affinity hinted at on rebuild/flex up
- */
- public static final int DEFAULT = NONE;
-
- /**
- * Strict placement: when asking for an instance for which there is
- * history, mandate that it is strict
- */
- public static final int STRICT = 1;
-
- /**
- * No data locality; do not use placement history
- */
- public static final int ANYWHERE = 2;
-
- /**
- * @Deprecated: use {@link #ANYWHERE}
- */
- @Deprecated
- public static final int NO_DATA_LOCALITY = ANYWHERE;
-
- /**
- * Anti-affinity is mandatory.
- */
- public static final int ANTI_AFFINITY_REQUIRED = 4;
-
- /**
- * Exclude from flexing; used internally to mark AMs.
- */
- public static final int EXCLUDE_FROM_FLEXING = 16;
-
-}