You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/12/07 21:10:02 UTC
[02/76] [abbrv] hadoop git commit: YARN-5461. Initial code ported
from slider-core module. (jianhe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
new file mode 100644
index 0000000..9e9e7ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -0,0 +1,598 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Execute a long-lived process.
+ *
+ * <p>
+ * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
+ * a short lived application; this class allows for the process to run for the
+ * life of the Java process that forked it.
+ * It is designed to be embedded inside a YARN service, though this is not
+ * the sole way that it can be used
+ * <p>
+ * Key Features:
+ * <ol>
+ * <li>Output is streamed to the output logger provided</li>.
+ * <li>the input stream is closed as soon as the process starts.</li>
+ * <li>The most recent lines of output are saved to a linked list</li>.
+ * <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent},
+ * is raised on the start and finish of a process.</li>
+ * </ol>
+ *
+ */
+public class LongLivedProcess implements Runnable {
+ /**
+ * Limit on number of lines to retain in the "recent" line list:{@value}
+ */
+ public static final int RECENT_LINE_LOG_LIMIT = 64;
+
+ /**
+ * Const defining the time in millis between polling for new text.
+ */
+ private static final int STREAM_READER_SLEEP_TIME = 200;
+
+ /**
+ * limit on the length of a stream before it triggers an automatic newline.
+ */
+ private static final int LINE_LENGTH = 256;
+ private final ProcessBuilder processBuilder;
+ private Process process;
+ private Integer exitCode = null;
+ private final String name;
+ private final ExecutorService processExecutor;
+ private final ExecutorService logExecutor;
+
+ private ProcessStreamReader processStreamReader;
+ //list of recent lines, recorded for extraction into reports
+ private final List<String> recentLines = new LinkedList<>();
+ private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
+ private LongLivedProcessLifecycleEvent lifecycleCallback;
+ private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false);
+
+ /**
+ * Log supplied in the constructor for the spawned process -accessible
+ * to inner classes
+ */
+ private Logger processLog;
+
+ /**
+ * Class log -accessible to inner classes
+ */
+ private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
+
+ /**
+ * flag to indicate that the process is done
+ */
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+
+ /**
+ * Create an instance
+ * @param name process name
+ * @param processLog log for output (or null)
+ * @param commands command list
+ */
+ public LongLivedProcess(String name,
+ Logger processLog,
+ List<String> commands) {
+ Preconditions.checkArgument(commands != null, "commands");
+
+ this.name = name;
+ this.processLog = processLog;
+ ServiceThreadFactory factory = new ServiceThreadFactory(name, true);
+ processExecutor = Executors.newSingleThreadExecutor(factory);
+ logExecutor = Executors.newSingleThreadExecutor(factory);
+ processBuilder = new ProcessBuilder(commands);
+ processBuilder.redirectErrorStream(false);
+ }
+
+ /**
+ * Set the limit on recent lines to retain
+ * @param recentLineLimit size of rolling list of recent lines.
+ */
+ public void setRecentLineLimit(int recentLineLimit) {
+ this.recentLineLimit = recentLineLimit;
+ }
+
+ /**
+ * Set an optional application exit callback
+ * @param lifecycleCallback callback to notify on application exit
+ */
+ public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
+ this.lifecycleCallback = lifecycleCallback;
+ }
+
+ /**
+ * Add an entry to the environment
+ * @param envVar envVar -must not be null
+ * @param val value
+ */
+ public void setEnv(String envVar, String val) {
+ Preconditions.checkArgument(envVar != null, "envVar");
+ Preconditions.checkArgument(val != null, "val");
+ processBuilder.environment().put(envVar, val);
+ }
+
+ /**
+ * Bulk set the environment from a map. This does
+ * not replace the existing environment, just extend it/overwrite single
+ * entries.
+ * @param map map to add
+ */
+ public void putEnvMap(Map<String, String> map) {
+ for (Map.Entry<String, String> entry : map.entrySet()) {
+ String val = entry.getValue();
+ String key = entry.getKey();
+ setEnv(key, val);
+ }
+ }
+
+ /**
+ * Get the process environment
+ * @param variable environment variable
+ * @return the value or null if there is no match
+ */
+ public String getEnv(String variable) {
+ return processBuilder.environment().get(variable);
+ }
+
+ /**
+ * Set the process log. Ignored once the process starts
+ * @param processLog new log ... may be null
+ */
+ public void setProcessLog(Logger processLog) {
+ this.processLog = processLog;
+ }
+
+ /**
+ * Get the process reference
+ * @return the process -null if the process is not started
+ */
+ public Process getProcess() {
+ return process;
+ }
+
+ /**
+ * Get the process builder -this can be manipulated
+ * up to the start() operation. As there is no synchronization
+ * around it, it must only be used in the same thread setting up the commmand.
+ * @return the process builder
+ */
+ public ProcessBuilder getProcessBuilder() {
+ return processBuilder;
+ }
+
+ /**
+ * Get the command list
+ * @return the comands
+ */
+ public List<String> getCommands() {
+ return processBuilder.command();
+ }
+
+ public String getCommand() {
+ return getCommands().get(0);
+ }
+
+ /**
+ * probe to see if the process is running
+ * @return true iff the process has been started and is not yet finished
+ */
+ public boolean isRunning() {
+ return process != null && !finished.get();
+ }
+
+ /**
+ * Get the exit code: null until the process has finished
+ * @return the exit code or null
+ */
+ public Integer getExitCode() {
+ return exitCode;
+ }
+
+ /**
+ * Get the exit code sign corrected: null until the process has finished
+ * @return the exit code or null
+ */
+ public Integer getExitCodeSignCorrected() {
+ Integer result;
+ if (exitCode != null) {
+ result = (exitCode << 24) >> 24;
+ } else {
+ result = null;
+ }
+ return result;
+ }
+
+ /**
+ * Stop the process if it is running.
+ * This will trigger an application completion event with the given exit code
+ */
+ public void stop() {
+ if (!isRunning()) {
+ return;
+ }
+ process.destroy();
+ }
+
+ /**
+ * Get a text description of the builder suitable for log output
+ * @return a multiline string
+ */
+ protected String describeBuilder() {
+ StringBuilder buffer = new StringBuilder();
+ for (String arg : processBuilder.command()) {
+ buffer.append('"').append(arg).append("\" ");
+ }
+ return buffer.toString();
+ }
+
+ /**
+ * Dump the environment to a string builder
+ * @param buffer the buffer to append to
+ */
+ public void dumpEnv(StringBuilder buffer) {
+ buffer.append("\nEnvironment\n-----------");
+ Map<String, String> env = processBuilder.environment();
+ Set<String> keys = env.keySet();
+ List<String> sortedKeys = new ArrayList<String>(keys);
+ Collections.sort(sortedKeys);
+ for (String key : sortedKeys) {
+ buffer.append(key).append("=").append(env.get(key)).append('\n');
+ }
+ }
+
+ /**
+ * Exec the process
+ * @return the process
+ * @throws IOException on aany failure to start the process
+ * @throws FileNotFoundException if the process could not be found
+ */
+ private Process spawnChildProcess() throws IOException {
+ if (process != null) {
+ throw new IOException("Process already started");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Spawning process:\n " + describeBuilder());
+ }
+ try {
+ process = processBuilder.start();
+ } catch (IOException e) {
+ // on windows, upconvert DOS error 2 from ::CreateProcess()
+ // to its real meaning: FileNotFound
+ if (e.toString().contains("CreateProcess error=2")) {
+ FileNotFoundException fnfe =
+ new FileNotFoundException(e.toString());
+ fnfe.initCause(e);
+ throw fnfe;
+ } else {
+ throw e;
+ }
+ }
+ return process;
+ }
+
+ /**
+ * Entry point for waiting for the program to finish
+ */
+ @Override // Runnable
+ public void run() {
+ Preconditions.checkNotNull(process, "null process");
+ LOG.debug("Lifecycle callback thread running");
+ //notify the callback that the process has started
+ if (lifecycleCallback != null) {
+ lifecycleCallback.onProcessStarted(this);
+ }
+ try {
+ //close stdin for the process
+ IOUtils.closeStream(process.getOutputStream());
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ LOG.debug("Process wait interrupted -exiting thread", e);
+ } finally {
+ //here the process has finished
+ LOG.debug("process {} has finished", name);
+ //tell the logger it has to finish too
+ finished.set(true);
+
+ // shut down the threads
+ logExecutor.shutdown();
+ try {
+ logExecutor.awaitTermination(60, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ //ignored
+ }
+
+ //now call the callback if it is set
+ if (lifecycleCallback != null) {
+ lifecycleCallback.onProcessExited(this, exitCode,
+ getExitCodeSignCorrected());
+ }
+ }
+ }
+
+ /**
+ * Spawn the application
+ * @throws IOException IO problems
+ */
+ public void start() throws IOException {
+
+ spawnChildProcess();
+ processStreamReader =
+ new ProcessStreamReader(processLog, STREAM_READER_SLEEP_TIME);
+ logExecutor.submit(processStreamReader);
+ processExecutor.submit(this);
+ }
+
+ /**
+ * Get the lines of recent output
+ * @return the last few lines of output; an empty list if there are none
+ * or the process is not actually running
+ */
+ public synchronized List<String> getRecentOutput() {
+ return new ArrayList<String>(recentLines);
+ }
+
+ /**
+ * @return whether lines of recent output are empty
+ */
+ public synchronized boolean isRecentOutputEmpty() {
+ return recentLines.isEmpty();
+ }
+
+ /**
+ * Query to see if the final output has been processed
+ * @return
+ */
+ public boolean isFinalOutputProcessed() {
+ return finalOutputProcessed.get();
+ }
+
+ /**
+ * Get the recent output from the process, or [] if not defined
+ *
+ * @param finalOutput flag to indicate "wait for the final output of the process"
+ * @param duration the duration, in ms,
+ * ro wait for recent output to become non-empty
+ * @return a possibly empty list
+ */
+ public List<String> getRecentOutput(boolean finalOutput, int duration) {
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <= duration) {
+ boolean finishedOutput;
+ if (finalOutput) {
+ // final flag means block until all data is done
+ finishedOutput = isFinalOutputProcessed();
+ } else {
+ // there is some output
+ finishedOutput = !isRecentOutputEmpty();
+ }
+ if (finishedOutput) {
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ return getRecentOutput();
+ }
+
+ /**
+ * add the recent line to the list of recent lines; deleting
+ * an earlier on if the limit is reached.
+ *
+ * Implementation note: yes, a circular array would be more
+ * efficient, especially with some power of two as the modulo,
+ * but is it worth the complexity and risk of errors for
+ * something that is only called once per line of IO?
+ * @param line line to record
+ * @param isErrorStream is the line from the error stream
+ * @param logger logger to log to - null for no logging
+ */
+ private synchronized void recordRecentLine(String line,
+ boolean isErrorStream,
+ Logger logger) {
+ if (line == null) {
+ return;
+ }
+ String entry = (isErrorStream ? "[ERR] " : "[OUT] ") + line;
+ recentLines.add(entry);
+ if (recentLines.size() > recentLineLimit) {
+ recentLines.remove(0);
+ }
+ if (logger != null) {
+ if (isErrorStream) {
+ logger.warn(line);
+ } else {
+ logger.info(line);
+ }
+ }
+ }
+
+ /**
+ * Class to read data from the two process streams, and, when run in a thread
+ * to keep running until the <code>done</code> flag is set.
+ * Lines are fetched from stdout and stderr and logged at info and error
+ * respectively.
+ */
+
+ private class ProcessStreamReader implements Runnable {
+ private final Logger streamLog;
+ private final int sleepTime;
+
+ /**
+ * Create an instance
+ * @param streamLog log -or null to disable logging (recent entries
+ * will still be retained)
+ * @param sleepTime time to sleep when stopping
+ */
+ private ProcessStreamReader(Logger streamLog, int sleepTime) {
+ this.streamLog = streamLog;
+ this.sleepTime = sleepTime;
+ }
+
+ /**
+ * Return a character if there is one, -1 if nothing is ready yet
+ * @param reader reader
+ * @return the value from the reader, or -1 if it is not ready
+ * @throws IOException IO problems
+ */
+ private int readCharNonBlocking(BufferedReader reader) throws IOException {
+ if (reader.ready()) {
+ return reader.read();
+ } else {
+ return -1;
+ }
+ }
+
+ /**
+ * Read in a line, or, if the limit has been reached, the buffer
+ * so far
+ * @param reader source of data
+ * @param line line to build
+ * @param limit limit of line length
+ * @return true if the line can be printed
+ * @throws IOException IO trouble
+ */
+ @SuppressWarnings("NestedAssignment")
+ private boolean readAnyLine(BufferedReader reader,
+ StringBuilder line,
+ int limit)
+ throws IOException {
+ int next;
+ while ((-1 != (next = readCharNonBlocking(reader)))) {
+ if (next != '\n') {
+ line.append((char) next);
+ limit--;
+ if (line.length() > limit) {
+ //enough has been read in to print it any
+ return true;
+ }
+ } else {
+ //line end return flag to say so
+ return true;
+ }
+ }
+ //here the end of the stream is hit, or the limit
+ return false;
+ }
+
+
+ @Override //Runnable
+ @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
+ public void run() {
+ BufferedReader errReader = null;
+ BufferedReader outReader = null;
+ StringBuilder outLine = new StringBuilder(LINE_LENGTH);
+ StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
+ try {
+ errReader = new BufferedReader(
+ new InputStreamReader(process.getErrorStream()));
+ outReader = new BufferedReader(
+ new InputStreamReader(process.getInputStream()));
+ while (!finished.get()) {
+ boolean processed = false;
+ if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
+ recordRecentLine(errorLine.toString(), true, streamLog);
+ errorLine.setLength(0);
+ processed = true;
+ }
+ if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
+ recordRecentLine(outLine.toString(), false, streamLog);
+ outLine.setLength(0);
+ processed |= true;
+ }
+ if (!processed && !finished.get()) {
+ //nothing processed: wait a bit for data.
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ //ignore this, rely on the done flag
+ LOG.debug("Ignoring ", e);
+ }
+ }
+ }
+ // finished: cleanup
+
+ //print the current error line then stream through the rest
+ recordFinalOutput(errReader, errorLine, true, streamLog);
+ //now do the info line
+ recordFinalOutput(outReader, outLine, false, streamLog);
+
+ } catch (Exception ignored) {
+ LOG.warn("encountered {}", ignored, ignored);
+ //process connection has been torn down
+ } finally {
+ // close streams
+ IOUtils.closeStream(errReader);
+ IOUtils.closeStream(outReader);
+ //mark output as done
+ finalOutputProcessed.set(true);
+ }
+ }
+
+ /**
+ * Record the final output of a process stream
+ * @param reader reader of output
+ * @param lineBuilder string builder into which line is built
+ * @param isErrorStream flag to indicate whether or not this is the
+ * is the line from the error stream
+ * @param logger logger to log to
+ * @throws IOException
+ */
+ protected void recordFinalOutput(BufferedReader reader,
+ StringBuilder lineBuilder, boolean isErrorStream, Logger logger) throws
+ IOException {
+ String line = lineBuilder.toString();
+ recordRecentLine(line, isErrorStream, logger);
+ line = reader.readLine();
+ while (line != null) {
+ recordRecentLine(line, isErrorStream, logger);
+ line = reader.readLine();
+ if (Thread.interrupted()) {
+ break;
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
new file mode 100644
index 0000000..a13b508
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+ * Callback when a long-lived application exits
+ */
+public interface LongLivedProcessLifecycleEvent {
+
+ /**
+ * Callback when a process is started
+ * @param process the process invoking the callback
+ */
+ void onProcessStarted(LongLivedProcess process);
+
+ /**
+ * Callback when a process has finished
+ * @param process the process invoking the callback
+ * @param exitCode exit code from the process
+ * @param signCorrectedCode the code- as sign corrected
+ */
+ void onProcessExited(LongLivedProcess process,
+ int exitCode,
+ int signCorrectedCode);
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
new file mode 100644
index 0000000..a123584
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+
+import java.util.List;
+
+/**
+ * Interface for accessing services that contain one or more child
+ * services.
+ */
+public interface ServiceParent extends Service {
+
+ /**
+ * Add a child service. It must be in a consistent state with the
+ * service to which it is being added.
+ * @param service the service to add.
+ */
+ void addService(Service service);
+
+ /**
+ * Get an unmodifiable list of services
+ * @return a list of child services at the time of invocation -
+ * added services will not be picked up.
+ */
+ List<Service> getServices();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
new file mode 100644
index 0000000..5ebf77c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A runnable which terminates its owner; it also catches any
+ * exception raised and can serve it back.
+ *
+ */
+public class ServiceTerminatingCallable<V> implements Callable<V> {
+
+ private final Service owner;
+ private Exception exception;
+ /**
+ * This is the callback
+ */
+ private final Callable<V> callable;
+
+
+ /**
+ * Create an instance. If the owner is null, the owning service
+ * is not terminated.
+ * @param owner owning service -can be null
+ * @param callable callback.
+ */
+ public ServiceTerminatingCallable(Service owner,
+ Callable<V> callable) {
+ Preconditions.checkArgument(callable != null, "null callable");
+ this.owner = owner;
+ this.callable = callable;
+ }
+
+
+ /**
+ * Get the owning service
+ * @return the service to receive notification when
+ * the runnable completes.
+ */
+ public Service getOwner() {
+ return owner;
+ }
+
+ /**
+ * Any exception raised by inner <code>action's</code> run.
+ * @return an exception or null.
+ */
+ public Exception getException() {
+ return exception;
+ }
+
+ /**
+ * Delegates the call to the callable supplied in the constructor,
+ * then calls the stop() operation on its owner. Any exception
+ * is caught, noted and rethrown
+ * @return the outcome of the delegated call operation
+ * @throws Exception if one was raised.
+ */
+ @Override
+ public V call() throws Exception {
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ exception = e;
+ throw e;
+ } finally {
+ if (owner != null) {
+ owner.stop();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
new file mode 100644
index 0000000..dc591df
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A runnable which terminates its after running; it also catches any
+ * exception raised and can serve it back.
+ */
+public class ServiceTerminatingRunnable implements Runnable {
+
+ private final Service owner;
+ private final Runnable action;
+ private Exception exception;
+
+ /**
+ * Create an instance.
+ * @param owner owning service
+ * @param action action to execute before terminating the service
+ */
+ public ServiceTerminatingRunnable(Service owner, Runnable action) {
+ Preconditions.checkArgument(owner != null, "null owner");
+ Preconditions.checkArgument(action != null, "null action");
+ this.owner = owner;
+ this.action = action;
+ }
+
+ /**
+ * Get the owning service.
+ * @return the service to receive notification when
+ * the runnable completes.
+ */
+ public Service getOwner() {
+ return owner;
+ }
+
+ /**
+ * Any exception raised by inner <code>action's</code> run.
+ * @return an exception or null.
+ */
+ public Exception getException() {
+ return exception;
+ }
+
+ @Override
+ public void run() {
+ try {
+ action.run();
+ } catch (Exception e) {
+ exception = e;
+ }
+ owner.stop();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
new file mode 100644
index 0000000..737197b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A thread factory that creates threads (possibly daemon threads)
+ * using the name and naming policy supplied.
+ * The thread counter starts at 1, increments atomically,
+ * and is supplied as the second argument in the format string.
+ *
+ * A static method, {@link #singleThreadExecutor(String, boolean)},
+ * exists to simplify the construction of an executor with a single well-named
+ * threads.
+ *
+ * Example
+ * <pre>
+ * ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
+ * </pre>
+ */
+public class ServiceThreadFactory implements ThreadFactory {
+
+ private static final AtomicInteger counter = new AtomicInteger(1);
+
+ /**
+ * Default format for thread names: {@value}.
+ */
+ public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
+ private final String name;
+ private final boolean daemons;
+ private final String namingFormat;
+
+ /**
+ * Create an instance
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ * @param namingFormat format string to generate thread names from
+ */
+ public ServiceThreadFactory(String name,
+ boolean daemons,
+ String namingFormat) {
+ Preconditions.checkArgument(name != null, "null name");
+ Preconditions.checkArgument(namingFormat != null, "null naming format");
+ this.name = name;
+ this.daemons = daemons;
+ this.namingFormat = namingFormat;
+ }
+
+ /**
+ * Create an instance with the default naming format.
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ */
+ public ServiceThreadFactory(String name,
+ boolean daemons) {
+ this(name, daemons, DEFAULT_NAMING_FORMAT);
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Preconditions.checkArgument(r != null, "null runnable");
+ String threadName =
+ String.format(namingFormat, name, counter.getAndIncrement());
+ Thread thread = new Thread(r, threadName);
+ thread.setDaemon(daemons);
+ return thread;
+ }
+
+ /**
+ * Create a single thread executor using this naming policy.
+ * @param name base thread name
+ * @param daemons flag to indicate the threads should be marked as daemons
+ * @return an executor
+ */
+ public static ExecutorService singleThreadExecutor(String name,
+ boolean daemons) {
+ return Executors.newSingleThreadExecutor(
+ new ServiceThreadFactory(name, daemons));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
new file mode 100644
index 0000000..65d14b7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A service that calls the supplied callback when it is started -after the
+ * given delay.
+ *
+ * It can be configured to stop itself after the callback has
+ * completed, marking any exception raised as the exception of this service.
+ * The notifications come in on a callback thread -a thread that is only
+ * started in this service's <code>start()</code> operation.
+ */
+public class WorkflowCallbackService<V> extends
+ WorkflowScheduledExecutorService<ScheduledExecutorService> {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowCallbackService.class);
+
+ /**
+ * This is the callback.
+ */
+ private final Callable<V> callback;
+ private final int delay;
+ private final ServiceTerminatingCallable<V> command;
+
+ private ScheduledFuture<V> scheduledFuture;
+
+ /**
+ * Create an instance of the service
+ * @param name service name
+ * @param callback callback to invoke
+ * @param delay delay -or 0 for no delay
+ * @param terminate terminate this service after the callback?
+ */
+ public WorkflowCallbackService(String name,
+ Callable<V> callback,
+ int delay,
+ boolean terminate) {
+ super(name);
+ Preconditions.checkNotNull(callback, "Null callback argument");
+ this.callback = callback;
+ this.delay = delay;
+ command = new ServiceTerminatingCallable<V>(
+ terminate ? this : null,
+ callback);
+ }
+
+ public ScheduledFuture<V> getScheduledFuture() {
+ return scheduledFuture;
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
+ ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor(
+ new ServiceThreadFactory(getName(), true));
+ setExecutor(executorService);
+ scheduledFuture =
+ executorService.schedule(command, delay, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Stop the service.
+ * If there is any exception noted from any executed notification,
+ * note the exception in this class
+ * @throws Exception exception.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ super.serviceStop();
+ // propagate any failure
+ if (getCallbackException() != null) {
+ throw getCallbackException();
+ }
+ }
+
+ /**
+ * Get the exception raised by a callback. Will always be null if the
+ * callback has not been executed; will only be non-null after any success.
+ * @return a callback
+ */
+ public Exception getCallbackException() {
+ return command.getException();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
new file mode 100644
index 0000000..9c653f3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * An extended composite service which stops itself if any child service
+ * fails, or when all its children have successfully stopped without failure.
+ *
+ * Lifecycle
+ * <ol>
+ * <li>If any child exits with a failure: this service stops, propagating
+ * the exception.</li>
+ * <li>When all child services has stopped, this service stops itself</li>
+ * </ol>
+ *
+ */
+public class WorkflowCompositeService extends CompositeService
+ implements ServiceParent, ServiceStateChangeListener {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowCompositeService.class);
+
+ /**
+ * Deadlock-avoiding overridden config for slider services; see SLIDER-1052
+ */
+ private volatile Configuration configuration;
+
+ /**
+ * Construct an instance
+ * @param name name of this service instance
+ */
+ public WorkflowCompositeService(String name) {
+ super(name);
+ }
+
+ @Override
+ public Configuration getConfig() {
+ return configuration;
+ }
+
+ @Override
+ protected void setConfig(Configuration conf) {
+ super.setConfig(conf);
+ configuration = conf;
+ }
+
+ /**
+ * Construct an instance with the default name.
+ */
+ public WorkflowCompositeService() {
+ this("WorkflowCompositeService");
+ }
+
+ /**
+ * Varargs constructor
+ * @param name name of this service instance
+ * @param children children
+ */
+ public WorkflowCompositeService(String name, Service... children) {
+ this(name);
+ for (Service child : children) {
+ addService(child);
+ }
+ }
+
+ /**
+ * Construct with a list of children
+ * @param name name of this service instance
+ * @param children children to add
+ */
+ public WorkflowCompositeService(String name, List<Service> children) {
+ this(name);
+ for (Service child : children) {
+ addService(child);
+ }
+ }
+
+ /**
+ * Add a service, and register it
+ * @param service the {@link Service} to be added.
+ * Important: do not add a service to a parent during your own serviceInit/start,
+ * in Hadoop 2.2; you will trigger a ConcurrentModificationException.
+ */
+ @Override
+ public synchronized void addService(Service service) {
+ Preconditions.checkArgument(service != null, "null service argument");
+ service.registerServiceListener(this);
+ super.addService(service);
+ }
+
+ /**
+ * When this service is started, any service stopping with a failure
+ * exception is converted immediately into a failure of this service,
+ * storing the failure and stopping ourselves.
+ * @param child the service that has changed.
+ */
+ @Override
+ public void stateChanged(Service child) {
+ //if that child stopped while we are running:
+ if (isInState(STATE.STARTED) && child.isInState(STATE.STOPPED)) {
+ // a child service has stopped
+ //did the child fail? if so: propagate
+ Throwable failureCause = child.getFailureCause();
+ if (failureCause != null) {
+ LOG.info("Child service " + child + " failed", failureCause);
+ //failure. Convert to an exception
+ Exception e = (failureCause instanceof Exception) ?
+ (Exception) failureCause : new Exception(failureCause);
+ //flip ourselves into the failed state
+ noteFailure(e);
+ stop();
+ } else {
+ LOG.info("Child service completed {}", child);
+ if (areAllChildrenStopped()) {
+ LOG.info("All children are halted: stopping");
+ stop();
+ }
+ }
+ }
+ }
+
+ /**
+ * Probe to query if all children are stopped -simply
+ * by taking a snapshot of the child service list and enumerating
+ * their state.
+ * The state of the children may change during this operation -that will
+ * not get picked up.
+ * @return true if all the children are stopped.
+ */
+ private boolean areAllChildrenStopped() {
+ List<Service> children = getServices();
+ boolean stopped = true;
+ for (Service child : children) {
+ if (!child.isInState(STATE.STOPPED)) {
+ stopped = false;
+ break;
+ }
+ }
+ return stopped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
new file mode 100644
index 0000000..7409d32
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.AbstractService;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * A service that hosts an executor -when the service is stopped,
+ * {@link ExecutorService#shutdownNow()} is invoked.
+ */
+public class WorkflowExecutorService<E extends ExecutorService> extends AbstractService {
+
+ private E executor;
+
+ /**
+ * Construct an instance with the given name -but
+ * no executor
+ * @param name service name
+ */
+ public WorkflowExecutorService(String name) {
+ this(name, null);
+ }
+
+ /**
+ * Construct an instance with the given name and executor
+ * @param name service name
+ * @param executor exectuor
+ */
+ public WorkflowExecutorService(String name,
+ E executor) {
+ super(name);
+ this.executor = executor;
+ }
+
+ /**
+ * Get the executor
+ * @return the executor
+ */
+ public synchronized E getExecutor() {
+ return executor;
+ }
+
+ /**
+ * Set the executor. Only valid if the current one is null
+ * @param executor executor
+ */
+ public synchronized void setExecutor(E executor) {
+ Preconditions.checkState(this.executor == null,
+ "Executor already set");
+ this.executor = executor;
+ }
+
+ /**
+ * Execute the runnable with the executor (which
+ * must have been created already)
+ * @param runnable runnable to execute
+ */
+ public void execute(Runnable runnable) {
+ getExecutor().execute(runnable);
+ }
+
+ /**
+ * Submit a callable
+ * @param callable callable
+ * @param <V> type of the final get
+ * @return a future to wait on
+ */
+ public <V> Future<V> submit(Callable<V> callable) {
+ return getExecutor().submit(callable);
+ }
+
+ /**
+ * Stop the service: halt the executor.
+ * @throws Exception exception.
+ */
+ @Override
+ protected void serviceStop() throws Exception {
+ stopExecutor();
+ super.serviceStop();
+ }
+
+ /**
+ * Stop the executor if it is not null.
+ * This uses {@link ExecutorService#shutdownNow()}
+ * and so does not block until they have completed.
+ */
+ protected synchronized void stopExecutor() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
new file mode 100644
index 0000000..b71530f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A YARN service that maps the start/stop lifecycle of an RPC server
+ * to the YARN service lifecycle.
+ */
+public class WorkflowRpcService extends AbstractService {
+
+ /** RPC server*/
+ private final Server server;
+
+ /**
+ * Construct an instance
+ * @param name service name
+ * @param server service to stop
+ */
+ public WorkflowRpcService(String name, Server server) {
+ super(name);
+ Preconditions.checkArgument(server != null, "Null server");
+ this.server = server;
+ }
+
+ /**
+ * Get the server
+ * @return the server
+ */
+ public Server getServer() {
+ return server;
+ }
+
+ /**
+ * Get the socket address of this server
+ * @return the address this server is listening on
+ */
+ public InetSocketAddress getConnectAddress() {
+ return NetUtils.getConnectAddress(server);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ super.serviceStart();
+ server.start();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ if (server != null) {
+ server.stop();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java
new file mode 100644
index 0000000..e9f53ed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowScheduledExecutorService.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import java.util.concurrent.ScheduledExecutorService;
+
+/**
+ * Scheduled executor or subclass thereof
+ * @param <E> scheduled executor service type
+ */
+public class WorkflowScheduledExecutorService<E extends ScheduledExecutorService>
+ extends WorkflowExecutorService<E> {
+
+ public WorkflowScheduledExecutorService(String name) {
+ super(name);
+ }
+
+ public WorkflowScheduledExecutorService(String name,
+ E executor) {
+ super(name, executor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
new file mode 100644
index 0000000..97f97e8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.service.ServiceStateException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This resembles the YARN CompositeService, except that it
+ * starts one service after another
+ *
+ * Workflow
+ * <ol>
+ * <li>When the <code>WorkflowSequenceService</code> instance is
+ * initialized, it only initializes itself.</li>
+ *
+ * <li>When the <code>WorkflowSequenceService</code> instance is
+ * started, it initializes then starts the first of its children.
+ * If there are no children, it immediately stops.</li>
+ *
+ * <li>When the active child stops, it did not fail, and the parent has not
+ * stopped -then the next service is initialized and started. If there is no
+ * remaining child the parent service stops.</li>
+ *
+ * <li>If the active child did fail, the parent service notes the exception
+ * and stops -effectively propagating up the failure.
+ * </li>
+ * </ol>
+ *
+ * New service instances MAY be added to a running instance -but no guarantees
+ * can be made as to whether or not they will be run.
+ */
+
+public class WorkflowSequenceService extends AbstractService implements
+ ServiceParent, ServiceStateChangeListener {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WorkflowSequenceService.class);
+
+ /**
+ * list of services
+ */
+ private final List<Service> serviceList = new ArrayList<>();
+
+ /**
+ * The currently active service.
+ * Volatile -may change & so should be read into a
+ * local variable before working with
+ */
+ private volatile Service activeService;
+
+ /**
+ the previous service -the last one that finished.
+ null if one did not finish yet
+ */
+ private volatile Service previousService;
+
+ private boolean stopIfNoChildServicesAtStartup = true;
+
+ /**
+ * Construct an instance
+ * @param name service name
+ */
+ public WorkflowSequenceService(String name) {
+ super(name);
+ }
+
+ /**
+ * Construct an instance with the default name
+ */
+ public WorkflowSequenceService() {
+ this("WorkflowSequenceService");
+ }
+
+ /**
+ * Create a service sequence with the given list of services
+ * @param name service name
+ * @param children initial sequence
+ */
+ public WorkflowSequenceService(String name, Service... children) {
+ super(name);
+ for (Service service : children) {
+ addService(service);
+ }
+ } /**
+ * Create a service sequence with the given list of services
+ * @param name service name
+ * @param children initial sequence
+ */
+ public WorkflowSequenceService(String name, List<Service> children) {
+ super(name);
+ for (Service service : children) {
+ addService(service);
+ }
+ }
+
+ /**
+ * Get the current service -which may be null
+ * @return service running
+ */
+ public Service getActiveService() {
+ return activeService;
+ }
+
+ /**
+ * Get the previously active service
+ * @return the service last run, or null if there is none.
+ */
+ public Service getPreviousService() {
+ return previousService;
+ }
+
+ protected void setStopIfNoChildServicesAtStartup(boolean stopIfNoChildServicesAtStartup) {
+ this.stopIfNoChildServicesAtStartup = stopIfNoChildServicesAtStartup;
+ }
+
+ /**
+ * When started
+ * @throws Exception
+ */
+ @Override
+ protected void serviceStart() throws Exception {
+ if (!startNextService() && stopIfNoChildServicesAtStartup) {
+ //nothing to start -so stop
+ stop();
+ }
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ //stop current service.
+ //this triggers a callback that is caught and ignored
+ Service current = activeService;
+ previousService = current;
+ activeService = null;
+ if (current != null) {
+ current.stop();
+ }
+ }
+
+ /**
+ * Start the next service in the list.
+ * Return false if there are no more services to run, or this
+ * service has stopped
+ * @return true if a service was started
+ * @throws RuntimeException from any init or start failure
+ * @throws ServiceStateException if this call is made before
+ * the service is started
+ */
+ public synchronized boolean startNextService() {
+ if (isInState(STATE.STOPPED)) {
+ //downgrade to a failed
+ LOG.debug("Not starting next service -{} is stopped", this);
+ return false;
+ }
+ if (!isInState(STATE.STARTED)) {
+ //reject attempts to start a service too early
+ throw new ServiceStateException(
+ "Cannot start a child service when not started");
+ }
+ if (serviceList.isEmpty()) {
+ //nothing left to run
+ return false;
+ }
+ if (activeService != null && activeService.getFailureCause() != null) {
+ //did the last service fail? Is this caused by some premature callback?
+ LOG.debug("Not starting next service due to a failure of {}",
+ activeService);
+ return false;
+ }
+ //bear in mind that init & start can fail, which
+ //can trigger re-entrant calls into the state change listener.
+ //by setting the current service to null
+ //the start-next-service logic is skipped.
+ //now, what does that mean w.r.t exit states?
+
+ activeService = null;
+ Service head = serviceList.remove(0);
+
+ try {
+ head.init(getConfig());
+ head.registerServiceListener(this);
+ head.start();
+ } catch (RuntimeException e) {
+ noteFailure(e);
+ throw e;
+ }
+ //at this point the service must have explicitly started & not failed,
+ //else an exception would have been raised
+ activeService = head;
+ return true;
+ }
+
+ /**
+ * State change event relays service stop events to
+ * {@link #onServiceCompleted(Service)}. Subclasses can
+ * extend that with extra logic
+ * @param service the service that has changed.
+ */
+ @Override
+ public void stateChanged(Service service) {
+ // only react to the state change when it is the current service
+ // and it has entered the STOPPED state
+ if (service == activeService && service.isInState(STATE.STOPPED)) {
+ onServiceCompleted(service);
+ }
+ }
+
+ /**
+ * handler for service completion: base class starts the next service
+ * @param service service that has completed
+ */
+ protected synchronized void onServiceCompleted(Service service) {
+ LOG.info("Running service stopped: {}", service);
+ previousService = activeService;
+
+
+ //start the next service if we are not stopped ourselves
+ if (isInState(STATE.STARTED)) {
+
+ //did the service fail? if so: propagate
+ Throwable failureCause = service.getFailureCause();
+ if (failureCause != null) {
+ Exception e = (failureCause instanceof Exception) ?
+ (Exception) failureCause : new Exception(failureCause);
+ noteFailure(e);
+ stop();
+ }
+
+ //start the next service
+ boolean started;
+ try {
+ started = startNextService();
+ } catch (Exception e) {
+ //something went wrong here
+ noteFailure(e);
+ started = false;
+ }
+ if (!started) {
+ //no start because list is empty
+ //stop and expect the notification to go upstream
+ stop();
+ }
+ } else {
+ //not started, so just note that the current service
+ //has gone away
+ activeService = null;
+ }
+ }
+
+ /**
+ * Add the passed {@link Service} to the list of services managed by this
+ * {@link WorkflowSequenceService}
+ * @param service the {@link Service} to be added
+ */
+ @Override
+ public synchronized void addService(Service service) {
+ Preconditions.checkArgument(service != null, "null service argument");
+ LOG.debug("Adding service {} ", service.getName());
+ synchronized (serviceList) {
+ serviceList.add(service);
+ }
+ }
+
+ /**
+ * Get an unmodifiable list of services
+ * @return a list of child services at the time of invocation -
+ * added services will not be picked up.
+ */
+ @Override //Parent
+ public synchronized List<Service> getServices() {
+ return Collections.unmodifiableList(serviceList);
+ }
+
+ @Override // Object
+ public synchronized String toString() {
+ return super.toString() + "; current service " + activeService
+ + "; queued service count=" + serviceList.size();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
new file mode 100644
index 0000000..36d059a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+
+<p>
+ This package contains classes which can be aggregated to build up
+ complex workflows of services: sequences of operations, callbacks
+ and composite services with a shared lifespan.
+ </p>
+
+<h2>
+ Core concepts:
+</h2>
+
+
+<p>
+The Workflow Services are set of Hadoop YARN services, all implementing
+the {@link org.apache.hadoop.service.Service} API.
+They are designed to be aggregated, to be composed to produce larger
+composite services which than perform ordered operations, notify other services
+when work has completed, and to propagate failure up the service hierarchy.
+</p>
+<p>
+Service instances may a limited lifespan, and may self-terminate when
+they consider it appropriate.</p>
+<p>
+Workflow Services that have children implement the
+{@link org.apache.slider.server.services.workflow.ServiceParent}
+class, which provides (thread-safe) access to the children -allowing new children
+to be added, and existing children to be ennumerated. The implement policies
+on how to react to the termination of children -so can sequence operations
+which terminate themselves when complete.
+</p>
+
+<p>
+Workflow Services may be subclassed to extend their behavior, or to use them
+in specific applications. Just as the standard
+{@link org.apache.hadoop.service.CompositeService}
+is often subclassed to aggregate child services, the
+{@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+can be used instead -adding the feature that failing services trigger automatic
+parent shutdown. If that is the desired operational mode of a class,
+swapping the composite service implementation may be sufficient to adopt it.
+</p>
+
+
+<h2> How do the workflow services differ from the standard YARN services? </h2>
+
+ <p>
+
+ There is exactly one standard YARN service for managing children, the
+ {@link org.apache.hadoop.service.CompositeService}.
+ </p><p>
+ The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ shares the same model of "child services, all inited and started together".
+ Where it differs is that if any child service stops -either due to a failure
+ or to an action which invokes that service's
+ {@link org.apache.hadoop.service.Service#stop()} method.
+ </p>
+ <p>
+
+In contrast, the original <code>CompositeService</code> class starts its children
+in its{@link org.apache.hadoop.service.Service#start()} method, but does not
+listen or react to any child service halting. As a result, changes in child
+state are not automatically detected or propagated, other than failures in
+the actual init() and start() methods.
+</p>
+
+<p>
+If a child service runs until completed -that is it will not be stopped until
+instructed to do so, and if it is only the parent service that attempts to
+stop the child, then this difference is unimportant.
+</p>
+<p>
+
+However, if any service that depends upon all it child services running -
+and if those child services are written so as to stop when they fail, using
+the <code>WorkflowCompositeService</code> as a base class will enable the
+parent service to be automatically notified of a child stopping.
+
+</p>
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
+resembles the composite service in API, but its workflow is different. It
+initializes and starts its children one-by-one, only starting the second after
+the first one succeeds, the third after the second, etc. If any service in
+the sequence fails, the parent <code>WorkflowSequenceService</code> stops,
+reporting the same exception.
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+Executes a process when started, and binds to the life of that process. When the
+process terminates, so does the service -and vice versa. This service enables
+external processes to be executed as part of a sequence of operations -or,
+using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+in parallel with other services, terminating the process when the other services
+stop -and vice versa.
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowCallbackService}
+executes a {@link java.util.concurrent.Callable} callback a specified delay
+after the service is started, then potentially terminates itself.
+This is useful for callbacks when a workflow reaches a specific point
+-or simply for executing arbitrary code in the workflow.
+
+ </p>
+
+
+<h2>
+Other Workflow Services
+</h2>
+
+There are some minor services that have proven useful within aggregate workflows,
+and simply in applications which are built from composite YARN services.
+
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }:
+ Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
+ When the service is started, so is the RPC server. Similarly, when the service
+ is stopped, so is the RPC server instance.
+ </li>
+
+ <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
+ an instance of {@link java.io.Closeable} when the service is stopped. This
+ is purely a housekeeping class.
+ </li>
+
+ </ul>
+
+ Lower-level classes
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.ServiceTerminatingRunnable }:
+ A {@link java.lang.Runnable} which runs the runnable supplied in its constructor
+ then signals its owning service to stop once that runnable is completed.
+ Any exception raised in the run is stored.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowExecutorService}:
+ A base class for services that wish to have a {@link java.util.concurrent.ExecutorService}
+ with a lifespan mapped to that of a service. When the service is stopped, the
+ {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to
+ attempt to shut down all running tasks.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.ServiceThreadFactory}:
+ This is a simple {@link java.util.concurrent.ThreadFactory} which generates
+ meaningful thread names. It can be used as a parameter to constructors of
+ {@link java.util.concurrent.ExecutorService} instances, to ensure that
+ log information can tie back text to the related services</li>
+ </ul>
+
+
+
+ */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8cab88d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java
new file mode 100644
index 0000000..254bf27
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/server/services/yarnregistry/YarnRegistryViewForProviders.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.yarnregistry;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.PathNotFoundException;
+import org.apache.hadoop.registry.client.api.RegistryConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.registry.client.api.BindFlags;
+import org.apache.hadoop.registry.client.api.RegistryOperations;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.binding.RegistryPathUtils;
+
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.slider.common.tools.SliderUtils;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.registry.client.binding.RegistryPathUtils.join;
+
+/**
+ * Registry view for providers. This tracks where the service
+ * is registered, offers access to the record and other things.
+ */
+public class YarnRegistryViewForProviders {
+
+ private final RegistryOperations registryOperations;
+
+ private final String user;
+
+ private final String sliderServiceClass;
+ private final String instanceName;
+ private final ApplicationAttemptId applicationAttemptId;
+ /**
+ * Record used where the service registered itself.
+ * Null until the service is registered
+ */
+ private ServiceRecord selfRegistration;
+
+ /**
+ * Path where record was registered
+ * Null until the service is registered
+ */
+ private String selfRegistrationPath;
+
+ public YarnRegistryViewForProviders(RegistryOperations registryOperations,
+ String user,
+ String sliderServiceClass,
+ String instanceName,
+ ApplicationAttemptId applicationAttemptId) {
+ Preconditions.checkArgument(registryOperations != null,
+ "null registry operations");
+ Preconditions.checkArgument(user != null, "null user");
+ Preconditions.checkArgument(SliderUtils.isSet(sliderServiceClass),
+ "unset service class");
+ Preconditions.checkArgument(SliderUtils.isSet(instanceName),
+ "instanceName");
+ Preconditions.checkArgument(applicationAttemptId != null,
+ "null applicationAttemptId");
+ this.registryOperations = registryOperations;
+ this.user = user;
+ this.sliderServiceClass = sliderServiceClass;
+ this.instanceName = instanceName;
+ this.applicationAttemptId = applicationAttemptId;
+ }
+
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return applicationAttemptId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getSliderServiceClass() {
+ return sliderServiceClass;
+ }
+
+ public String getInstanceName() {
+ return instanceName;
+ }
+
+ public RegistryOperations getRegistryOperations() {
+ return registryOperations;
+ }
+
+ public ServiceRecord getSelfRegistration() {
+ return selfRegistration;
+ }
+
+ private void setSelfRegistration(ServiceRecord selfRegistration) {
+ this.selfRegistration = selfRegistration;
+ }
+
+ /**
+ * Get the path to where the service has registered itself.
+ * Null until the service is registered
+ * @return the service registration path.
+ */
+ public String getSelfRegistrationPath() {
+ return selfRegistrationPath;
+ }
+
+ /**
+ * Get the absolute path to where the service has registered itself.
+ * This includes the base registry path
+ * Null until the service is registered
+ * @return the service registration path.
+ */
+ public String getAbsoluteSelfRegistrationPath() {
+ if (selfRegistrationPath == null) {
+ return null;
+ }
+ String root = registryOperations.getConfig().getTrimmed(
+ RegistryConstants.KEY_REGISTRY_ZK_ROOT,
+ RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT);
+ return RegistryPathUtils.join(root, selfRegistrationPath);
+ }
+
+ /**
+ * Add a component under the slider name/entry
+ * @param componentName component name
+ * @param record record to put
+ * @throws IOException
+ */
+ public void putComponent(String componentName,
+ ServiceRecord record) throws
+ IOException {
+ putComponent(sliderServiceClass, instanceName,
+ componentName,
+ record);
+ }
+
+ /**
+ * Add a component
+ * @param serviceClass service class to use under ~user
+ * @param componentName component name
+ * @param record record to put
+ * @throws IOException
+ */
+ public void putComponent(String serviceClass,
+ String serviceName,
+ String componentName,
+ ServiceRecord record) throws IOException {
+ String path = RegistryUtils.componentPath(
+ user, serviceClass, serviceName, componentName);
+ registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+ registryOperations.bind(path, record, BindFlags.OVERWRITE);
+ }
+
+ /**
+ * Add a service under a path, optionally purging any history
+ * @param username user
+ * @param serviceClass service class to use under ~user
+ * @param serviceName name of the service
+ * @param record service record
+ * @param deleteTreeFirst perform recursive delete of the path first.
+ * @return the path the service was created at
+ * @throws IOException
+ */
+ public String putService(String username,
+ String serviceClass,
+ String serviceName,
+ ServiceRecord record,
+ boolean deleteTreeFirst) throws IOException {
+ String path = RegistryUtils.servicePath(
+ username, serviceClass, serviceName);
+ if (deleteTreeFirst) {
+ registryOperations.delete(path, true);
+ }
+ registryOperations.mknode(RegistryPathUtils.parentOf(path), true);
+ registryOperations.bind(path, record, BindFlags.OVERWRITE);
+ return path;
+ }
+
+ /**
+ * Add a service under a path for the current user
+ * @param serviceClass service class to use under ~user
+ * @param serviceName name of the service
+ * @param record service record
+ * @param deleteTreeFirst perform recursive delete of the path first
+ * @return the path the service was created at
+ * @throws IOException
+ */
+ public String putService(
+ String serviceClass,
+ String serviceName,
+ ServiceRecord record,
+ boolean deleteTreeFirst) throws IOException {
+ return putService(user, serviceClass, serviceName, record, deleteTreeFirst);
+ }
+
+
+ /**
+ * Add a service under a path for the current user
+ * @param serviceClass service class to use under ~user
+ * @param serviceName name of the service
+ * @param record service record
+ * @param deleteTreeFirst perform recursive delete of the path first
+ * @return the path the service was created at
+ * @throws IOException
+ */
+ public String registerSelf(
+ ServiceRecord record,
+ boolean deleteTreeFirst) throws IOException {
+ selfRegistrationPath =
+ putService(user, sliderServiceClass, instanceName, record, deleteTreeFirst);
+ setSelfRegistration(record);
+ return selfRegistrationPath;
+ }
+
+ /**
+ * Update the self record by pushing out the latest version of the service
+ * registration record.
+ * @throws IOException any failure.
+ */
+ public void updateSelf() throws IOException {
+ putService(user, sliderServiceClass, instanceName, selfRegistration, false);
+ }
+
+ /**
+ * Delete a component
+ * @param componentName component name
+ * @throws IOException
+ */
+ public void deleteComponent(String componentName) throws IOException {
+ String path = RegistryUtils.componentPath(
+ user, sliderServiceClass, instanceName,
+ componentName);
+ registryOperations.delete(path, false);
+ }
+
+ /**
+ * Delete the children of a path -but not the path itself.
+ * It is not an error if the path does not exist
+ * @param path path to delete
+ * @param recursive flag to request recursive deletes
+ * @throws IOException IO problems
+ */
+ public void deleteChildren(String path, boolean recursive) throws IOException {
+ List<String> childNames = null;
+ try {
+ childNames = registryOperations.list(path);
+ } catch (PathNotFoundException e) {
+ return;
+ }
+ for (String childName : childNames) {
+ String child = join(path, childName);
+ registryOperations.delete(child, recursive);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org