You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/02 20:11:57 UTC

[6/8] git commit: move RPCService in as WorkflowRPCService, add tests, review and update docs ready for copy to hadoop trunk

move RPCService in as WorkflowRPCService, add tests, review and update docs ready for copy to hadoop trunk


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/649a3b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/649a3b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/649a3b74

Branch: refs/heads/develop
Commit: 649a3b74646699e421acdc7793d3f79da2d28186
Parents: 3770370
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 19:10:48 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 19:10:48 2014 +0100

----------------------------------------------------------------------
 .../server/appmaster/SliderAppMaster.java       |   6 +-
 .../LaunchedWorkflowCompositeService.java       |   2 +-
 .../server/services/utility/RpcService.java     |  65 -----------
 .../AbstractWorkflowExecutorService.java        |  34 ++++--
 .../services/workflow/ClosingService.java       |  30 ++++-
 .../services/workflow/ForkedProcessService.java |  28 +++++
 .../services/workflow/LongLivedProcess.java     |   7 +-
 .../LongLivedProcessLifecycleEvent.java         |   2 +-
 .../workflow/ServiceTerminatingRunnable.java    |  14 +++
 .../services/workflow/WorkflowRpcService.java   |  76 ++++++++++++
 .../server/services/workflow/package-info.java  | 108 +++++++++++------
 .../workflow/ParentWorkflowTestBase.java        |   4 +
 .../services/workflow/TestCloseableService.java | 116 -------------------
 .../workflow/TestWorkflowClosingService.java    | 116 +++++++++++++++++++
 .../workflow/TestWorkflowRpcService.java        | 107 +++++++++++++++++
 .../workflow/WorkflowServiceTestBase.java       |  10 +-
 16 files changed, 484 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index f245d56..2457b4d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -113,7 +113,7 @@ import org.apache.slider.server.appmaster.web.rest.RestPaths;
 import org.apache.slider.server.services.registry.SliderRegistryService;
 import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
 import org.apache.slider.server.services.workflow.WorkflowEventCallback;
-import org.apache.slider.server.services.utility.RpcService;
+import org.apache.slider.server.services.workflow.WorkflowRpcService;
 import org.apache.slider.server.services.utility.WebAppService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -191,7 +191,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
    */
   private ByteBuffer allTokens;
 
-  private RpcService rpcService;
+  private WorkflowRpcService rpcService;
 
   /**
    * Secret manager
@@ -911,7 +911,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
                                                     .newReflectiveBlockingService(
                                                       protobufRelay);
 
-    rpcService = new RpcService(RpcBinder.createProtobufServer(
+    rpcService = new WorkflowRpcService("SliderRPC", RpcBinder.createProtobufServer(
       new InetSocketAddress("0.0.0.0", 0),
       getConfig(),
       secretManager,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
index b5d11e7..0d47c3b 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
@@ -91,7 +91,7 @@ public class LaunchedWorkflowCompositeService extends WorkflowCompositeService
 
   @Override
   public synchronized void addService(Service service) {
-    Preconditions.checkNotNull(service, "null service");
+    Preconditions.checkArgument(service != null, "null service argument");
     super.addService(service);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java
deleted file mode 100644
index 72412d4..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.AbstractService;
-
-import java.net.InetSocketAddress;
-
-/**
- * A YARN service that maps the start/stop lifecycle of an RPC server
- * to the YARN service lifecycle
- */
-public class RpcService extends AbstractService {
-
-  /** RPC server*/
-  private final Server server;
-
-  /**
-   * Construct an instance
-   * @param server server to manger
-   */
-  public RpcService(Server server) {
-    super("RpcService");
-    this.server = server;
-  }
-
-  public Server getServer() {
-    return server;
-  }
-
-  public InetSocketAddress getConnectAddress() {
-    return NetUtils.getConnectAddress(server);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-    server.start();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (server != null) {
-      server.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
index 17d3b50..c26e3c4 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
@@ -25,27 +25,47 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 /**
- * A service that hosts an executor -in shutdown it is stopped.
+ * A service that hosts an executor -when the service is stopped,
+ * {@link ExecutorService#shutdownNow()} is invoked.
  */
 public abstract class AbstractWorkflowExecutorService extends AbstractService {
 
   private ExecutorService executor;
-  
+
+  /**
+   * Construct an instance with the given name -but
+   * no executor
+   * @param name service name
+   */
   public AbstractWorkflowExecutorService(String name) {
     this(name, null);
   }
 
+  /**
+   * Construct an instance with the given name and executor
+   * @param name service name
+   * @param executor exectuor
+   */
   protected AbstractWorkflowExecutorService(String name,
       ExecutorService executor) {
     super(name);
     this.executor = executor;
   }
 
-  public ExecutorService getExecutor() {
+  /**
+   * Get the executor
+   * @return the executor
+   */
+  public synchronized ExecutorService getExecutor() {
     return executor;
   }
 
-  protected void setExecutor(ExecutorService executor) {
+  /**
+   * Set the executor. This is protected as it
+   * is intended to be restricted to subclasses
+   * @param executor executor
+   */
+  protected synchronized void setExecutor(ExecutorService executor) {
     this.executor = executor;
   }
 
@@ -55,7 +75,7 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
    * @param runnable runnable to execute
    */
   public void execute(Runnable runnable) {
-    executor.execute(runnable);
+    getExecutor().execute(runnable);
   }
 
   /**
@@ -65,7 +85,7 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
    * @return a future to wait on
    */
   public <V> Future<V> submit(Callable<V> callable) {
-    return executor.submit(callable);
+    return getExecutor().submit(callable);
   }
   /**
    * Stop the service: halt the executor. 
@@ -82,7 +102,7 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
    * This uses {@link ExecutorService#shutdownNow()}
    * and so does not block until they have completed.
    */
-  protected void stopExecutor() {
+  protected synchronized void stopExecutor() {
     if (executor != null) {
       executor.shutdownNow();
     }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
index 8468a98..7a475cc 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -31,25 +31,42 @@ import java.io.IOException;
  */
 public class ClosingService<C extends Closeable> extends AbstractService {
 
-  private volatile C closeable;
+  private C closeable;
 
 
+  /**
+   * Construct an instance of the service
+   * @param name service name
+   * @param closeable closeable to close (may be null)
+   */
   public ClosingService(String name,
       C closeable) {
     super(name);
     this.closeable = closeable;
   }
 
+  /**
+   * Construct an instance of the service, using the default name
+   * @param closeable closeable to close (may be null)
+   */
   public ClosingService(C closeable) {
     this("ClosingService", closeable);
   }
 
 
-  public C getCloseable() {
+  /**
+   * Get the closeable
+   * @return the closeable
+   */
+  public synchronized C getCloseable() {
     return closeable;
   }
 
-  public void setCloseable(C closeable) {
+  /**
+   * Set or update the closeable.
+   * @param closeable
+   */
+  public synchronized void setCloseable(C closeable) {
     this.closeable = closeable;
   }
 
@@ -61,13 +78,14 @@ public class ClosingService<C extends Closeable> extends AbstractService {
    */
   @Override
   protected void serviceStop() {
-    if (closeable != null) {
+    C target = getCloseable();
+    if (target != null) {
       try {
-        closeable.close();
+        target.close();
       } catch (IOException ioe) {
         noteFailure(ioe);
       }
-      closeable = null;
+      setCloseable(null);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index b5459da..141ab7d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -35,7 +35,35 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This service is notified when the subprocess terminates, and stops itself 
  * and converts a non-zero exit code into a failure exception.
  * 
+ * <p>
+ * Key Features:
+ * <ol>
+ *   <li>The property {@link #executionTimeout} can be set to set a limit
+ *   on the duration of a process</li>
+ *   <li>Output is streamed to the output logger provided</li>.
+ *   <li>The most recent lines of output are saved to a linked list</li>.
+ *   <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start
+ *   and finish of a process.</li>
+ * </ol>
+ *
+ * Usage:
+ * <p></p>
+ * The service can be built in the constructor, {@link #ForkedProcessService(String, Map, List)},
+ * or have its simple constructor used to instantiate the service, then the 
+ * {@link #build(Map, List)} command used to define the environment variables
+ * and list of commands to execute. One of these two options MUST be exercised
+ * before calling the services's {@link #start()} method.
+ * <p></p>
+ * The forked process is executed in the service's {@link #serviceStart()} method;
+ * if still running when the service is stopped, {@link #serviceStop()} will
+ * attempt to stop it.
+ * <p></p>
  * 
+ * The service delegates process execution to {@link LongLivedProcess},
+ * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}.
+ * When the service receives a callback notifying that the process has completed,
+ * it calls its {@link #stop()} method. If the error code was non-zero, 
+ * the service is logged as having failed.
  */
 public class ForkedProcessService extends AbstractWorkflowExecutorService implements
     LongLivedProcessLifecycleEvent, Runnable {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index fe895e9..a1db64f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -88,16 +88,17 @@ public class LongLivedProcess implements Runnable {
    * Log supplied in the constructor for the spawned process -accessible
    * to inner classes
    */
-  final Logger processLog;
+  private final Logger processLog;
+  
   /**
    * Class log -accessible to inner classes
    */
-  static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
 
   /**
    * Volatile flag to indicate that the process is done
    */
-  volatile boolean finished;
+  private volatile boolean finished;
 
   public LongLivedProcess(String name,
       Logger processLog,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
index 86d20ff..a13b508 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -33,7 +33,7 @@ public interface LongLivedProcessLifecycleEvent {
    * Callback when a process has finished
    * @param process the process invoking the callback
    * @param exitCode exit code from the process
-   * @param signCorrectedCode
+   * @param signCorrectedCode the code- as sign corrected
    */
   void onProcessExited(LongLivedProcess process,
       int exitCode,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
index 8549971..a69c1fc 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -30,6 +30,11 @@ public class ServiceTerminatingRunnable implements Runnable {
   private final Runnable action;
   private Exception exception;
 
+  /**
+   * Create an instance
+   * @param owner owning service
+   * @param action action to execute before terminating the service
+   */
   public ServiceTerminatingRunnable(Service owner, Runnable action) {
     Preconditions.checkArgument(owner != null, "null owner");
     Preconditions.checkArgument(action != null, "null action");
@@ -37,10 +42,19 @@ public class ServiceTerminatingRunnable implements Runnable {
     this.action = action;
   }
 
+  /**
+   * Get the owning service
+   * @return the service to receive notification when
+   * the runnable completes.
+   */
   public Service getOwner() {
     return owner;
   }
 
+  /**
+   * Any exception raised by inner <code>action's</code> run.
+   * @return an exception or null.
+   */
   public Exception getException() {
     return exception;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
new file mode 100644
index 0000000..b71530f
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A YARN service that maps the start/stop lifecycle of an RPC server
+ * to the YARN service lifecycle. 
+ */
+public class WorkflowRpcService extends AbstractService {
+
+  /** RPC server*/
+  private final Server server;
+
+  /**
+   * Construct an instance
+   * @param name service name
+   * @param server service to stop
+   */
+  public WorkflowRpcService(String name, Server server) {
+    super(name);
+    Preconditions.checkArgument(server != null, "Null server");
+    this.server = server;
+  }
+
+  /**
+   * Get the server
+   * @return the server
+   */
+  public Server getServer() {
+    return server;
+  }
+
+  /**
+   * Get the socket address of this server
+   * @return the address this server is listening on
+   */
+  public InetSocketAddress getConnectAddress() {
+    return NetUtils.getConnectAddress(server);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    server.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
index b6492fe..4dd2cc7 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -26,14 +26,15 @@ package org.apache.slider.server.services.workflow;
  This package contains classes which can be aggregated to build up
  complex workflows of services: sequences of operations, callbacks
  and composite services with a shared lifespan.
- 
+
  Core concepts:
  <ol>
  <li>
  Workflow service instances have a limited lifespan, and will self-terminate when
  they consider it time</li>
  <li>
- Workflow Services that have children implement the {@link org.apache.slider.server.services.workflow.ServiceParent}
+ Workflow Services that have children implement the
+ {@link org.apache.slider.server.services.workflow.ServiceParent}
  class, which provides (thread-safe) access to the children -allowing new children
  to be added, and existing children to be ennumerated
  </li>
@@ -44,77 +45,108 @@ package org.apache.slider.server.services.workflow;
  </li>
  <li>
  Workflow Services may be subclassed to extend their behavior, or to use them
- in specific applications. Just as the standard {@link org.apache.hadoop.service.CompositeService}
- is often subclassed to aggregate child services, the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ in specific applications. Just as the standard
+ {@link org.apache.hadoop.service.CompositeService}
+ is often subclassed to aggregate child services, the
+ {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
  can be used instead -adding the feature that failing services trigger automatic
  parent shutdown. If that is the desired operational mode of a class,
  swapping the composite service implementation may be sufficient to adopt it.
  </li>
  </ol>
- 
+
  <h2>
- How do the workflow services differ from the standard <code>CompositeService</code>?
+ How do the workflow services differ from the standard YARN services?
  </h2>
+
+ <p>
  
+ There is exactly one standard YARN service for managing children, the
+ {@link org.apache.hadoop.service.CompositeService}.
+ </p><p>
  The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
  shares the same model of "child services, all inited and started together".
  Where it differs is that if any child service stops -either due to a failure
- or to an action which invokes that service's <code>stop()</code> method.
- 
+ or to an action which invokes that service's
+ {@link org.apache.hadoop.service.Service#stop()} method.
+ </p><p>
+
  In contrast, the original <code>CompositeService</code> class starts its children
- in its <code>start()</code> method, but does not listen or react to any
+ in its{@link org.apache.hadoop.service.Service#start()}  method, but does not listen or react to any
  child service halting. As a result, changes in child state are not detected
  or propagated.
- 
+  </p><p>
+
  If a child service runs until completed -that is it will not be stopped until
  instructed to do so, and if it is only the parent service that attempts to
  stop the child, then this difference is unimportant. 
- 
+  </p><p>
+
  However, if any service that depends upon all it child services running -
  and if those child services are written so as to stop when they fail, using
  the <code>WorkflowCompositeService</code> as a base class will enable the 
  parent service to be automatically notified of a child stopping.
- 
+
+ </p><p>
  The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
  resembles the composite service in API, but its workflow is different. It
  initializes and starts its children one-by-one, only starting the second after
  the first one succeeds, the third after the second, etc. If any service in
  the sequence fails, the parent <code>WorkflowSequenceService</code> stops, 
  reporting the same exception. 
- 
- 
- <h2>
- Other Workflow Services
- </h2>
+ </p>
+ <p>
+ The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+ Executes a process when started, and binds to the life of that process. When the
+ process terminates, so does the service -and vice versa. This service enables
+ external processes to be executed as part of a sequence of operations -or,
+ using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ in parallel with other services, terminating the process when the other services
+ stop -and vice versa.
+ </p>
+
+
+<h2>
+Other Workflow Services
+</h2>
+
+ There are some minor services that have proven useful within aggregate workflows,
+ and simply in applications which are built from composite YARN services.
  
  <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }:
+ Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
+ When the service is started, so is the RPC server. Similarly, when the service
+ is stopped, so is the RPC server instance. 
+ </li>
  <li>{@link org.apache.slider.server.services.workflow.WorkflowEventNotifyingService }:
- Notifies callbacks when a workflow reaches a specific point (potentially after a delay).</li>
- <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
- Executes a process when started, and binds to the life of that process. When the
- process terminates, so does the service -and vice versa.</li>
+ Notifies callbacks when a workflow reaches a specific point (potentially after a delay).
+ </li>
  <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
- an instance of <code>Closeable</code> when the service is stopped. This
- is purely a housekeeping class.</></li>
- <li>{@link }: </li>
+ an instance of {@link java.io.Closeable} when the service is stopped. This
+ is purely a housekeeping class.
+ </li>
+
  </ul>
 
-Lower-level classes 
+ Lower-level classes 
  <ul>
- <li>{@link org.apache.slider.server.services.workflow.AbstractWorkflowExecutorService }:
- This is a base class for YARN services that use an {@link java.util.concurrent.ExecutorService}.
- for managing asynchronous operations: it stops the executor when the service is
- stopped.
+ <li>{@link org.apache.slider.server.services.workflow.ServiceTerminatingRunnable }:
+ A {@link java.lang.Runnable} which runs the runnable supplied in its constructor
+ then signals its owning service to stop once that runnable is completed. 
+ Any exception raised in the run is stored.
  </li>
- <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
- Executes a process when started, and binds to the life of that process. When the
- process terminates, so does the service -and vice versa.</li>
- <li>{@link org.apache.slider.server.services.workflow.LongLivedProcess}:
- The inner class used to managed the forked process. When called directly it
- offers more features.</li>
- <li>{@link org.apache.slider.server.services.workflow.ClosingService}:
- A parameterized service to close the <code>Closeable</code> passed in -used for cleaning
- up references.</li>
+ <li>{@link org.apache.slider.server.services.workflow.AbstractWorkflowExecutorService}:
+ A base class for services that wish to have a {@link java.util.concurrent.ExecutorService}
+ with a lifespan mapped to that of a service. When the service is stopped, the
+ {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to
+ attempt to shut down all running tasks.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.ServiceThreadFactory}:
+ This is a simple {@link java.util.concurrent.ThreadFactory} which generates
+ meaningful thread names. It can be used as a parameter to constructors of 
+ {@link java.util.concurrent.ExecutorService} instances, to ensure that
+ log information can tie back text to the related services</li>
  </ul>
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
index 000705f..a11a1cf 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
@@ -20,6 +20,10 @@ package org.apache.slider.server.services.workflow;
 
 import org.apache.hadoop.service.Service;
 
+/**
+ * Extends {@link WorkflowServiceTestBase} with parent-specific operations
+ * and logic to build up and run the parent service
+ */
 public abstract class ParentWorkflowTestBase extends WorkflowServiceTestBase {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
deleted file mode 100644
index 3623687..0000000
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class TestCloseableService extends WorkflowServiceTestBase {
-
-  @Test
-  public void testSimpleClose() throws Throwable {
-    ClosingService<OpenClose> svc = instance(false);
-    OpenClose openClose = svc.getCloseable();
-    assertFalse(openClose.closed);
-    svc.stop();
-    assertTrue(openClose.closed);
-  }
-
-  @Test
-  public void testNullClose() throws Throwable {
-    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(null);
-    svc.init(new Configuration());
-    svc.start();
-    assertNull(svc.getCloseable());
-    svc.stop();
-  }
-
-  @Test
-  public void testFailingClose() throws Throwable {
-    ClosingService<OpenClose> svc = instance(false);
-    OpenClose openClose = svc.getCloseable();
-    openClose.raiseExceptionOnClose = true;
-    svc.stop();
-    assertTrue(openClose.closed);
-    Throwable cause = svc.getFailureCause();
-    assertNotNull(cause);
-
-    //retry should be a no-op
-    svc.close();
-  }
-
-  @Test
-  public void testDoubleClose() throws Throwable {
-    ClosingService<OpenClose> svc = instance(false);
-    OpenClose openClose = svc.getCloseable();
-    openClose.raiseExceptionOnClose = true;
-    svc.stop();
-    assertTrue(openClose.closed);
-    Throwable cause = svc.getFailureCause();
-    assertNotNull(cause);
-    openClose.closed = false;
-    svc.stop();
-    assertEquals(cause, svc.getFailureCause());
-  }
-
-  /**
-   * This does not recurse forever, as the service has already entered the
-   * STOPPED state before the inner close tries to stop it -that operation
-   * is a no-op
-   * @throws Throwable
-   */
-  @Test
-  public void testCloseSelf() throws Throwable {
-    ClosingService<ClosingService> svc =
-        new ClosingService<ClosingService>(null);
-    svc.setCloseable(svc);
-    svc.stop();
-  }
-
-
-  private ClosingService<OpenClose> instance(boolean raiseExceptionOnClose) {
-    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(new OpenClose(
-        raiseExceptionOnClose));
-    svc.init(new Configuration());
-    svc.start();
-    return svc;
-  }
-
-  private static class OpenClose implements Closeable {
-    public boolean closed = false;
-    public boolean raiseExceptionOnClose;
-
-    private OpenClose(boolean raiseExceptionOnClose) {
-      this.raiseExceptionOnClose = raiseExceptionOnClose;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (!closed) {
-        closed = true;
-        if (raiseExceptionOnClose) {
-          throw new IOException("OpenClose");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java
new file mode 100644
index 0000000..638547f
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class TestWorkflowClosingService extends WorkflowServiceTestBase {
+
+  @Test
+  public void testSimpleClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    assertFalse(openClose.closed);
+    svc.stop();
+    assertTrue(openClose.closed);
+  }
+
+  @Test
+  public void testNullClose() throws Throwable {
+    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(null);
+    svc.init(new Configuration());
+    svc.start();
+    assertNull(svc.getCloseable());
+    svc.stop();
+  }
+
+  @Test
+  public void testFailingClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    openClose.raiseExceptionOnClose = true;
+    svc.stop();
+    assertTrue(openClose.closed);
+    Throwable cause = svc.getFailureCause();
+    assertNotNull(cause);
+
+    //retry should be a no-op
+    svc.close();
+  }
+
+  @Test
+  public void testDoubleClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    openClose.raiseExceptionOnClose = true;
+    svc.stop();
+    assertTrue(openClose.closed);
+    Throwable cause = svc.getFailureCause();
+    assertNotNull(cause);
+    openClose.closed = false;
+    svc.stop();
+    assertEquals(cause, svc.getFailureCause());
+  }
+
+  /**
+   * This does not recurse forever, as the service has already entered the
+   * STOPPED state before the inner close tries to stop it -that operation
+   * is a no-op
+   * @throws Throwable
+   */
+  @Test
+  public void testCloseSelf() throws Throwable {
+    ClosingService<ClosingService> svc =
+        new ClosingService<ClosingService>(null);
+    svc.setCloseable(svc);
+    svc.stop();
+  }
+
+
+  private ClosingService<OpenClose> instance(boolean raiseExceptionOnClose) {
+    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(new OpenClose(
+        raiseExceptionOnClose));
+    svc.init(new Configuration());
+    svc.start();
+    return svc;
+  }
+
+  private static class OpenClose implements Closeable {
+    public boolean closed = false;
+    public boolean raiseExceptionOnClose;
+
+    private OpenClose(boolean raiseExceptionOnClose) {
+      this.raiseExceptionOnClose = raiseExceptionOnClose;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        closed = true;
+        if (raiseExceptionOnClose) {
+          throw new IOException("OpenClose");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java
new file mode 100644
index 0000000..c7910ff
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class TestWorkflowRpcService extends WorkflowServiceTestBase {
+
+  @Test
+  public void testCreateMockRPCService() throws Throwable {
+    MockRPC rpc = new MockRPC();
+    rpc.start();
+    assertTrue(rpc.started);
+    rpc.getListenerAddress();
+    rpc.stop();
+    assertTrue(rpc.stopped);
+  }
+
+  @Test
+  public void testLifecycle() throws Throwable {
+    MockRPC rpc = new MockRPC();
+    WorkflowRpcService svc = new WorkflowRpcService("test", rpc);
+    run(svc);
+    assertTrue(rpc.started);
+    svc.getConnectAddress();
+    svc.stop();
+    assertTrue(rpc.stopped);
+  }
+  
+  @Test
+  public void testStartFailure() throws Throwable {
+    MockRPC rpc = new MockRPC();
+    rpc.failOnStart = true;
+    WorkflowRpcService svc = new WorkflowRpcService("test", rpc);
+    svc.init(new Configuration());
+    try {
+      svc.start();
+      fail("expected an exception");
+    } catch (RuntimeException e) {
+      assertEquals("failOnStart", e.getMessage());
+    }
+    svc.stop();
+    assertTrue(rpc.stopped);
+  }
+  
+  private static class MockRPC extends Server {
+
+    public boolean stopped;
+    public boolean started;
+    public boolean failOnStart;
+
+    private MockRPC() throws IOException {
+      super("localhost", 0, null, 1, new Configuration());
+    }
+
+    @Override
+    public synchronized void start() {
+      if (failOnStart) {
+        throw new RuntimeException("failOnStart");
+      }
+      started = true;
+      super.start();
+    }
+
+    @Override
+    public synchronized void stop() {
+      stopped = true;
+      super.stop();
+    }
+
+    @Override
+    public synchronized InetSocketAddress getListenerAddress() {
+      return super.getListenerAddress();
+    }
+
+    @Override
+    public Writable call(RPC.RpcKind rpcKind,
+        String protocol,
+        Writable param,
+        long receiveTime) throws Exception {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index 6c0cdc4..95331b1 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -30,6 +30,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
+/**
+ * Test base for workflow service tests.
+ */
 public abstract class WorkflowServiceTestBase extends Assert {
   private static final Logger
       log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
@@ -77,7 +80,6 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
-
   /**
    * Init and start a service
    * @param svc the service
@@ -106,6 +108,12 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
+  /**
+   * Assert that a string is in an output list. Fails fast if the output
+   * list is empty
+   * @param text text to scan for
+   * @param output list of output lines.
+   */
   public void assertStringInOutput(String text, List<String> output) {
     assertTrue("Empty output list", !output.isEmpty());
     boolean found = false;