You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/05/30 20:57:16 UTC

[09/12] git commit: SLIDER-94 more workflow tests and javadocs; sequence service now correctly terminates if started without any child services

SLIDER-94 more workflow tests and javadocs; sequence service now correctly terminates if started without any child services


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/2cdb4b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/2cdb4b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/2cdb4b74

Branch: refs/heads/develop
Commit: 2cdb4b7442742cba8f2e77f01226a6873f8677d4
Parents: 21cdfbd
Author: Steve Loughran <st...@apache.org>
Authored: Fri May 30 18:46:34 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Fri May 30 18:46:34 2014 +0100

----------------------------------------------------------------------
 .../providers/AbstractProviderService.java      |   2 +-
 .../server/appmaster/RoleLaunchService.java     |  39 +++---
 .../server/appmaster/SliderAppMaster.java       |   6 +-
 .../utility/AbstractSliderLaunchedService.java  |   2 +-
 .../server/services/utility/ClosingService.java |  58 ---------
 .../LaunchedWorkflowCompositeService.java       | 113 +++++++++++++++++
 .../WorkflowCompositeLaunchedService.java       | 117 ------------------
 .../services/workflow/ClosingService.java       |  58 +++++++++
 .../services/workflow/ForkedProcessService.java | 104 +++++++---------
 .../services/workflow/LongLivedProcess.java     |  21 ++--
 .../LongLivedProcessLifecycleEvent.java         |  38 ++++++
 .../workflow/ProcessLifecycleEventCallback.java |  29 -----
 .../server/services/workflow/ServiceParent.java |   5 +
 .../workflow/ServiceTerminatingRunnable.java    |  57 +++++++++
 .../services/workflow/ServiceThreadFactory.java |  39 +++++-
 .../workflow/WorkflowCompositeService.java      |  32 ++++-
 .../workflow/WorkflowEventCallback.java         |   6 +-
 .../workflow/WorkflowEventNotifyingService.java |  73 +++++++----
 .../workflow/WorkflowExecutorService.java       |  73 +++++++++++
 .../workflow/WorkflowSequenceService.java       | 106 +++++++++++-----
 .../server/services/workflow/package-info.java  | 120 +++++++++++++++++++
 .../workflow/TestWorkflowCompositeService.java  |  15 +--
 .../workflow/TestWorkflowSequenceService.java   |  47 ++++++--
 .../workflow/WorkflowServiceTestBase.java       |  48 ++++++--
 .../accumulo/AccumuloProviderService.java       |   3 +-
 25 files changed, 828 insertions(+), 383 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index f4a872c..da9f1d1 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -181,7 +181,7 @@ public abstract class AbstractProviderService
    * @return the forkes service
    */
   protected ForkedProcessService latestProcess() {
-    Service current = getCurrentService();
+    Service current = getActiveService();
     Service prev = getPreviousService();
 
     Service latest = current != null ? current : prev;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
index d90eeb6..7a09115 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
@@ -18,8 +18,8 @@
 
 package org.apache.slider.server.appmaster;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.slider.common.tools.SliderFileSystem;
 import org.apache.slider.core.conf.AggregateConf;
@@ -29,6 +29,8 @@ import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderService;
 import org.apache.slider.server.appmaster.state.RoleInstance;
 import org.apache.slider.server.appmaster.state.RoleStatus;
+import org.apache.slider.server.services.workflow.ServiceThreadFactory;
+import org.apache.slider.server.services.workflow.WorkflowExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,11 +38,12 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
 
 /**
  * A service for launching containers
  */
-public class RoleLaunchService extends AbstractService {
+public class RoleLaunchService extends WorkflowExecutorService {
   protected static final Logger log =
     LoggerFactory.getLogger(RoleLaunchService.class);
   /**
@@ -101,14 +104,15 @@ public class RoleLaunchService extends AbstractService {
    * @param provider the provider
    * @param fs filesystem
    * @param generatedConfDirPath path in the FS for the generated dir
-   * @param envVars
-   * @param launcherTmpDirPath
+   * @param envVars environment variables
+   * @param launcherTmpDirPath path for a temporary data in the launch process
    */
   public RoleLaunchService(ContainerStartOperation startOperation,
                            ProviderService provider,
                            SliderFileSystem fs,
                            Path generatedConfDirPath,
-                           Map<String, String> envVars, Path launcherTmpDirPath) {
+                           Map<String, String> envVars,
+      Path launcherTmpDirPath) {
     super(ROLE_LAUNCH_SERVICE);
     containerStarter = startOperation;
     this.fs = fs;
@@ -118,6 +122,14 @@ public class RoleLaunchService extends AbstractService {
     this.envVars = envVars;
   }
 
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    setExecutor(Executors.newCachedThreadPool(
+        new ServiceThreadFactory(ROLE_LAUNCH_SERVICE, true)));
+  }
+
   @Override
   protected void serviceStop() throws Exception {
     joinAllLaunchedThreads();
@@ -138,16 +150,13 @@ public class RoleLaunchService extends AbstractService {
     assert provider.isSupportedRole(roleName) : "unsupported role";
     RoleLaunchService.RoleLauncher launcher =
       new RoleLaunchService.RoleLauncher(container,
-                                         role.getProviderRole(),
-                                         clusterSpec,
-                                         clusterSpec.getResourceOperations()
-                                                    .getOrAddComponent(roleName),
-                                         clusterSpec.getAppConfOperations()
-                                                    .getOrAddComponent(roleName) );
+         role.getProviderRole(),
+         clusterSpec,
+         clusterSpec.getResourceOperations() .getOrAddComponent( roleName),
+         clusterSpec.getAppConfOperations().getOrAddComponent(roleName));
     launchThread(launcher,
-                 String.format("%s-%s", roleName,
-                               container.getId().toString())
-                );
+                 String.format("%s-%s", roleName, container.getId().toString())
+    );
   }
 
 
@@ -190,7 +199,7 @@ public class RoleLaunchService extends AbstractService {
     //first: take a snapshot of the thread list
     List<Thread> liveThreads;
     synchronized (launchThreads) {
-      liveThreads = new ArrayList<Thread>(launchThreads.values());
+      liveThreads = new ArrayList<>(launchThreads.values());
     }
     int size = liveThreads.size();
     if (size > 0) {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index d75c788..25a9c7a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1321,7 +1321,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       // didn't start, so don't register
       providerService.start();
       // and send the started event ourselves
-      eventCallbackEvent();
+      eventCallbackEvent(null);
     }
   }
 
@@ -1331,7 +1331,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   /* =================================================================== */
 
   @Override // EventCallback
-  public void eventCallbackEvent() {
+  public void eventCallbackEvent(Object parameter) {
     // signalled that the child process is up.
     appState.noteAMLive();
     // now ask for the cluster nodes
@@ -1382,6 +1382,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
           exitCode,
           mappedProcessExitCode);
       }
+    } else {
+      super.stateChanged(service);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
index 6870be6..6c0edb8 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/AbstractSliderLaunchedService.java
@@ -37,7 +37,7 @@ import static org.apache.slider.common.SliderXmlConfKeys.REGISTRY_PATH;
  * Base service for the standard slider client/server services
  */
 public abstract class AbstractSliderLaunchedService extends
-    WorkflowCompositeLaunchedService {
+    LaunchedWorkflowCompositeService {
   private static final Logger log =
     LoggerFactory.getLogger(AbstractSliderLaunchedService.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java
deleted file mode 100644
index 4696fce..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/ClosingService.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.service.AbstractService;
-
-import java.io.Closeable;
-
-/**
- * Service that closes the closeable supplied during shutdown, if not null.
- */
-public class ClosingService<C extends Closeable> extends AbstractService {
-
-  private volatile C closeable;
-
-
-  public ClosingService(String name,
-                        C closeable) {
-    super(name);
-    this.closeable = closeable;
-  }
-
-  public Closeable getCloseable() {
-    return closeable;
-  }
-
-  public void setCloseable(C closeable) {
-    this.closeable = closeable;
-  }
-
-  /**
-   * Stop routine will close the closeable -if not null - and set the
-   * reference to null afterwards
-   * @throws Exception
-   */
-  @Override
-  protected void serviceStop() throws Exception {
-    IOUtils.closeStream(closeable);
-    closeable = null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
new file mode 100644
index 0000000..b5d11e7
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.utility;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.slider.core.main.LauncherExitCodes;
+import org.apache.slider.core.main.RunService;
+import org.apache.slider.server.services.workflow.WorkflowCompositeService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LaunchedWorkflowCompositeService extends WorkflowCompositeService
+    implements RunService {
+  private static final Logger log = LoggerFactory.getLogger(
+      LaunchedWorkflowCompositeService.class);
+  private String[] argv;
+  
+  public LaunchedWorkflowCompositeService(String name) {
+    super(name);
+  }
+
+  public LaunchedWorkflowCompositeService(String name, Service... children) {
+    super(name, children);
+  }
+
+  /**
+   * Implementation of set-ness, groovy definition of true/false for a string
+   * @param s
+   * @return
+   */
+  protected static boolean isUnset(String s) {
+    return StringUtils.isEmpty(s);
+  }
+
+  protected static boolean isSet(String s) {
+    return StringUtils.isNotEmpty(s);
+  }
+
+  protected String[] getArgv() {
+    return argv;
+  }
+
+  /**
+   * Pre-init argument binding
+   * @param config the initial configuration build up by the
+   * service launcher.
+   * @param args argument list list of arguments passed to the command line
+   * after any launcher-specific commands have been stripped.
+   * @return the configuration
+   * @throws Exception
+   */
+  @Override
+  public Configuration bindArgs(Configuration config, String... args) throws
+                                                                      Exception {
+    this.argv = args;
+    if (log.isDebugEnabled()) {
+      log.debug("Binding {} Arguments:", args.length);
+
+      StringBuilder builder = new StringBuilder();
+      for (String arg : args) {
+        builder.append('"').append(arg).append("\" ");
+      }
+      log.debug(builder.toString());
+    }
+    return config;
+  }
+
+  @Override
+  public int runService() throws Throwable {
+    return LauncherExitCodes.EXIT_SUCCESS;
+  }
+
+  @Override
+  public synchronized void addService(Service service) {
+    Preconditions.checkNotNull(service, "null service");
+    super.addService(service);
+  }
+
+  /**
+   * Run a child service -initing and starting it if this
+   * service has already passed those parts of its own lifecycle
+   * @param service the service to start
+   */
+  protected boolean deployChildService(Service service) {
+    service.init(getConfig());
+    addService(service);
+    if (isInState(STATE.STARTED)) {
+      service.start();
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
deleted file mode 100644
index 6e9614b..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/WorkflowCompositeLaunchedService.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.slider.core.main.LauncherExitCodes;
-import org.apache.slider.core.main.RunService;
-import org.apache.slider.server.services.workflow.WorkflowCompositeService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class WorkflowCompositeLaunchedService extends WorkflowCompositeService
-    implements RunService {
-  private static final Logger log = LoggerFactory.getLogger(
-      WorkflowCompositeLaunchedService.class);
-  private String[] argv;
-  
-  public WorkflowCompositeLaunchedService(String name) {
-    super(name);
-  }
-
-  public WorkflowCompositeLaunchedService() {
-    super("WorkflowCompositeLaunchedService");
-  }
-
-  public WorkflowCompositeLaunchedService(String name, Service... children) {
-    super(name, children);
-  }
-
-  /**
-   * Implementation of set-ness, groovy definition of true/false for a string
-   * @param s
-   * @return
-   */
-  protected static boolean isUnset(String s) {
-    return StringUtils.isEmpty(s);
-  }
-
-  protected static boolean isSet(String s) {
-    return StringUtils.isNotEmpty(s);
-  }
-
-  protected String[] getArgv() {
-    return argv;
-  }
-
-  /**
-   * Pre-init argument binding
-   * @param config the initial configuration build up by the
-   * service launcher.
-   * @param args argument list list of arguments passed to the command line
-   * after any launcher-specific commands have been stripped.
-   * @return the configuration
-   * @throws Exception
-   */
-  @Override
-  public Configuration bindArgs(Configuration config, String... args) throws
-                                                                      Exception {
-    this.argv = args;
-    if (log.isDebugEnabled()) {
-      log.debug("Binding {} Arguments:", args.length);
-
-      StringBuilder builder = new StringBuilder();
-      for (String arg : args) {
-        builder.append('"').append(arg).append("\" ");
-      }
-      log.debug(builder.toString());
-    }
-    return config;
-  }
-
-  @Override
-  public int runService() throws Throwable {
-    return LauncherExitCodes.EXIT_SUCCESS;
-  }
-
-  @Override
-  public synchronized void addService(Service service) {
-    Preconditions.checkNotNull(service, "null service");
-    super.addService(service);
-  }
-
-  /**
-   * Run a child service -initing and starting it if this
-   * service has already passed those parts of its own lifecycle
-   * @param service the service to start
-   */
-  protected boolean deployChildService(Service service) {
-    service.init(getConfig());
-    addService(service);
-    if (isInState(STATE.STARTED)) {
-      service.start();
-      return true;
-    }
-    return false;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
new file mode 100644
index 0000000..7c9054c
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.io.Closeable;
+
+/**
+ * Service that closes the closeable supplied during shutdown, if not null.
+ */
+public class ClosingService<C extends Closeable> extends AbstractService {
+
+  private volatile C closeable;
+
+
+  public ClosingService(String name,
+                        C closeable) {
+    super(name);
+    this.closeable = closeable;
+  }
+
+  public Closeable getCloseable() {
+    return closeable;
+  }
+
+  public void setCloseable(C closeable) {
+    this.closeable = closeable;
+  }
+
+  /**
+   * Stop routine will close the closeable -if not null - and set the
+   * reference to null afterwards
+   * @throws Exception
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    IOUtils.closeStream(closeable);
+    closeable = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index 3ddeb2e..1d6ccae 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -18,12 +18,7 @@
 
 package org.apache.slider.server.services.workflow;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.ServiceStateException;
-import org.apache.slider.common.tools.SliderUtils;
-import org.apache.slider.core.exceptions.SliderException;
-import org.apache.slider.core.main.ExitCodeProvider;
 import org.apache.slider.core.main.ServiceLaunchException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,25 +35,20 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This service is notified when the subprocess terminates, and stops itself 
  * and converts a non-zero exit code into a failure exception
  */
-public class ForkedProcessService extends AbstractService implements
-    ProcessLifecycleEventCallback,
-                                                          ExitCodeProvider,
-                                                          Runnable {
+public class ForkedProcessService extends WorkflowExecutorService implements
+    LongLivedProcessLifecycleEvent, Runnable {
 
   /**
    * Log for the forked master process
    */
-  protected static final Logger log =
+  private static final Logger LOG =
     LoggerFactory.getLogger(ForkedProcessService.class);
 
-  private final String name;
   private final AtomicBoolean processTerminated = new AtomicBoolean(false);
-  ;
   private boolean processStarted = false;
   private LongLivedProcess process;
   private Map<String, String> environment;
   private List<String> commands;
-  private String commandLine;
   private int executionTimeout = -1;
   private int timeoutCode = 1;
 
@@ -66,22 +56,19 @@ public class ForkedProcessService extends AbstractService implements
    * Exit code set when the spawned process exits
    */
   private AtomicInteger exitCode = new AtomicInteger(0);
-  private Thread timeoutThread;
 
+  /**
+   * Create an instance of the service
+   * @param name a name
+   */
   public ForkedProcessService(String name) {
     super(name);
-    this.name = name;
-  }
-
-  @Override //AbstractService
-  protected void serviceInit(Configuration conf) throws Exception {
-    super.serviceInit(conf);
   }
 
   @Override //AbstractService
   protected void serviceStart() throws Exception {
     if (process == null) {
-      throw new ServiceStateException("Subprocess not yet configured");
+      throw new ServiceStateException("Process not yet configured");
     }
     //now spawn the process -expect updates via callbacks
     process.spawnApplication();
@@ -90,6 +77,10 @@ public class ForkedProcessService extends AbstractService implements
   @Override //AbstractService
   protected void serviceStop() throws Exception {
     completed(0);
+    stopForkedProcess();
+  }
+
+  private void stopForkedProcess() {
     if (process != null) {
       process.stop();
     }
@@ -106,45 +97,40 @@ public class ForkedProcessService extends AbstractService implements
 
   /**
    * Build the process to execute when the service is started
-   * @param commands list of commands is inserted on the front
+   * @param commandList list of commands is inserted on the front
    * @param env environment variables above those generated by
    * @throws IOException IO problems
-   * @throws SliderException anything internal
    */
-  public void build(Map<String, String> environment,
-                    List<String> commands) throws
-                                           IOException,
-      SliderException {
+  public void build(Map<String, String> env,
+                    List<String> commandList)
+      throws IOException {
     assert process == null;
-    this.commands = commands;
-    this.commandLine = SliderUtils.join(commands, " ", false);
-    this.environment = environment;
-    process = new LongLivedProcess(getName(), log, commands);
+    this.commands = commandList;
+    this.environment = env;
+    process = new LongLivedProcess(getName(), LOG, commandList);
     process.setLifecycleCallback(this);
     //set the env variable mapping
-    process.putEnvMap(environment);
+    process.putEnvMap(env);
   }
 
   @Override // ApplicationEventHandler
-  public synchronized void onProcessStarted(LongLivedProcess application) {
-    log.info("Process has started");
+  public synchronized void onProcessStarted(LongLivedProcess process) {
+    LOG.info("Process has started");
     processStarted = true;
     if (executionTimeout > 0) {
-      timeoutThread = new Thread(this);
-      timeoutThread.start();
+      setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+      execute(this);
     }
   }
 
   @Override // ApplicationEventHandler
-  public void onProcessExited(LongLivedProcess application,
-      int exitC) {
+  public void onProcessExited(LongLivedProcess process, int code) {
     synchronized (this) {
-      completed(exitC);
+      completed(code);
       //note whether or not the service had already stopped
-      log.info("Process has exited with exit code {}", exitC);
-      if (exitC != 0) {
-        reportFailure(exitC, name + " failed with code " +
-                             exitC);
+      LOG.info("Process has exited with exit code {}", code);
+      if (code != 0) {
+        reportFailure(code, getName() + " failed with code " + code);
       }
     }
     //now stop itself
@@ -153,13 +139,11 @@ public class ForkedProcessService extends AbstractService implements
     }
   }
 
-  private void reportFailure(int exitC, String text) {
-    this.exitCode.set(exitC);
+  private void reportFailure(int code, String text) {
+    this.exitCode.set(code);
     //error
-    ServiceLaunchException execEx =
-      new ServiceLaunchException(exitC,
-                                 text);
-    log.debug("Noting failure", execEx);
+    ServiceLaunchException execEx = new ServiceLaunchException(code, text);
+    LOG.debug("Noting failure", execEx);
     noteFailure(execEx);
   }
 
@@ -180,20 +164,27 @@ public class ForkedProcessService extends AbstractService implements
     }
     //check the status; if the marker isn't true, bail
     if (!processTerminated.getAndSet(true)) {
-      log.info("process timeout: reporting error code {}", timeoutCode);
+      LOG.info("process timeout: reporting error code {}", timeoutCode);
 
       //timeout
       if (isInState(STATE.STARTED)) {
         //trigger a failure
-        process.stop();
+        stopForkedProcess();
       }
-      reportFailure(timeoutCode, name + ": timeout after " + executionTimeout
+      reportFailure(timeoutCode, getName() + ": timeout after " + executionTimeout
                    + " millis: exit code =" + timeoutCode);
     }
   }
 
-  protected void completed(int exitCode) {
-    this.exitCode.set(exitCode);
+  /**
+   * Note the process as having completed.
+   * The exit code is stored, the process marked as terminated
+   * -and anything synchronized on <code>processTerminated</code>
+   * is notified
+   * @param code exit code
+   */
+  protected void completed(int code) {
+    exitCode.set(code);
     processTerminated.set(true);
     synchronized (processTerminated) {
       processTerminated.notify();
@@ -209,15 +200,10 @@ public class ForkedProcessService extends AbstractService implements
   }
 
 
-  @Override // ExitCodeProvider
   public int getExitCode() {
     return exitCode.get();
   }
 
-  public String getCommandLine() {
-    return commandLine;
-  }
-
   /**
    * Get the recent output from the process, or [] if not defined
    * @return a possibly empty list

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 12fe3f4..22a4c00 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -39,13 +39,14 @@ import java.util.concurrent.TimeUnit;
 /**
  * Execute a long-lived process.
  *
- * Hadoop's Shell class assumes it is executing
+ * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
  * a short lived application; this class allows for the process to run for the
  * life of the Java process that forked it.
  */
 public class LongLivedProcess implements Runnable {
   public static final int STREAM_READER_SLEEP_TIME = 200;
   public static final int RECENT_LINE_LOG_LIMIT = 64;
+  public static final int LINE_LENGTH = 256;
   private final ProcessBuilder processBuilder;
   private Process process;
   private Exception exception;
@@ -58,7 +59,7 @@ public class LongLivedProcess implements Runnable {
   //list of recent lines, recorded for extraction into reports
   private final List<String> recentLines = new LinkedList<String>();
   private final int recentLineLimit = RECENT_LINE_LOG_LIMIT;
-  private ProcessLifecycleEventCallback lifecycleCallback;
+  private LongLivedProcessLifecycleEvent lifecycleCallback;
 
   
   /**
@@ -103,7 +104,7 @@ public class LongLivedProcess implements Runnable {
    * Set an optional application exit callback
    * @param lifecycleCallback callback to notify on application exit
    */
-  public void setLifecycleCallback(ProcessLifecycleEventCallback lifecycleCallback) {
+  public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
     Preconditions.checkNotNull(lifecycleCallback, "null lifecycleCallback");
     this.lifecycleCallback = lifecycleCallback;
   }
@@ -330,6 +331,12 @@ public class LongLivedProcess implements Runnable {
       this.sleepTime = sleepTime;
     }
 
+    /**
+     * Return a character if there is one, -1 if nothing is ready yet
+     * @param reader reader
+     * @return the value from the reader, or -1 if it is not ready
+     * @throws IOException IO problems
+     */
     private int readCharNonBlocking(BufferedReader reader) throws IOException {
       if (reader.ready()) {
         return reader.read();
@@ -376,8 +383,8 @@ public class LongLivedProcess implements Runnable {
     public void run() {
       BufferedReader errReader = null;
       BufferedReader outReader = null;
-      StringBuilder outLine = new StringBuilder(256);
-      StringBuilder errorLine = new StringBuilder(256);
+      StringBuilder outLine = new StringBuilder(LINE_LENGTH);
+      StringBuilder errorLine = new StringBuilder(LINE_LENGTH);
       try {
         errReader = new BufferedReader(
             new InputStreamReader(process.getErrorStream()));
@@ -385,14 +392,14 @@ public class LongLivedProcess implements Runnable {
             new InputStreamReader(process.getInputStream()));
         while (!finished) {
           boolean processed = false;
-          if (readAnyLine(errReader, errorLine, 256)) {
+          if (readAnyLine(errReader, errorLine, LINE_LENGTH)) {
             String line = errorLine.toString();
             recordRecentLine(line, true);
             streamLog.warn(line);
             errorLine.setLength(0);
             processed = true;
           }
-          if (readAnyLine(outReader, outLine, 256)) {
+          if (readAnyLine(outReader, outLine, LINE_LENGTH)) {
             String line = outLine.toString();
             recordRecentLine(line, false);
             streamLog.info(line);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
new file mode 100644
index 0000000..af83ed0
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -0,0 +1,38 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+ * Callback when a long-lived application exits
+ */
+public interface LongLivedProcessLifecycleEvent {
+
+  /**
+   * Callback when a process is started
+   * @param process the process invoking the callback
+   */
+  void onProcessStarted(LongLivedProcess process);
+
+  /**
+   * Callback when a process has finished
+   * @param process the process invoking the callback
+   * @param exitCode exit code from the process
+   */
+  void onProcessExited(LongLivedProcess process, int exitCode);
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java
deleted file mode 100644
index 2dd2a2b..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ProcessLifecycleEventCallback.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-/**
- * Callback when a long-lived application exits
- */
-public interface ProcessLifecycleEventCallback {
-
-  void onProcessStarted(LongLivedProcess application);
-
-  void onProcessExited(LongLivedProcess application, int exitCode);
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
index 8bad60e..a123584 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceParent.java
@@ -28,6 +28,11 @@ import java.util.List;
  */
 public interface ServiceParent extends Service {
 
+  /**
+   * Add a child service. It must be in a consistent state with the
+   * service to which it is being added.
+   * @param service the service to add.
+   */
   void addService(Service service);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
new file mode 100644
index 0000000..f2563ec
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.service.Service;
+
+/**
+ * A runnable which terminates its owner. 
+ */
+public class ServiceTerminatingRunnable implements Runnable {
+
+  private final Service owner;
+  private final Runnable action;
+  private Exception exception;
+
+  public ServiceTerminatingRunnable(Service owner, Runnable action) {
+    Preconditions.checkNotNull(owner, "null owner");
+    Preconditions.checkNotNull(action, "null action");
+    this.owner = owner;
+    this.action = action;
+  }
+
+  public Service getOwner() {
+    return owner;
+  }
+
+  public Exception getException() {
+    return exception;
+  }
+
+  @Override
+  public void run() {
+    try {
+      action.run();
+    } catch (Exception e) {
+      exception = e;
+    }
+    owner.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
index be7c54d..b1d235d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -20,6 +20,8 @@ package org.apache.slider.server.services.workflow;
 
 import com.google.common.base.Preconditions;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -27,16 +29,34 @@ import java.util.concurrent.atomic.AtomicInteger;
  * A thread factory that creates threads (possibly daemon threads)
  * using the name and naming policy supplied.
  * The thread counter starts at 1, increments atomically, 
- * and is supplied as the second argument in the format string
+ * and is supplied as the second argument in the format string.
+ * 
+ * A static method, {@link #newSingleThreadExecutor(String, boolean)},
+ * exists to simplify the construction of an executor with a single well-named
+ * threads. 
+ * 
+ * Example
+ * <pre>
+ *  ExecutorService exec = ServiceThreadFactory.newSingleThreadExecutor("live", true)
+ * </pre>
  */
 public class ServiceThreadFactory implements ThreadFactory {
 
   private static AtomicInteger counter = new AtomicInteger(1);
+  /**
+   * Default format for thread names: {@value}
+   */
   public static final String DEFAULT_NAMING_FORMAT = "%s-%03d";
   private final String name;
   private final boolean daemons;
   private final String namingFormat;
 
+  /**
+   * Create an instance
+   * @param name base thread name
+   * @param daemons flag to indicate the threads should be marked as daemons
+   * @param namingFormat format string to generate thread names from
+   */
   public ServiceThreadFactory(String name,
       boolean daemons,
       String namingFormat) {
@@ -47,6 +67,12 @@ public class ServiceThreadFactory implements ThreadFactory {
     this.namingFormat = namingFormat;
   }
 
+  /**
+   *
+   * Create an instance with the default naming format
+   * @param name base thread name
+   * @param daemons flag to indicate the threads should be marked as daemons
+   */
   public ServiceThreadFactory(String name,
       boolean daemons) {
     this(name, daemons, DEFAULT_NAMING_FORMAT);
@@ -60,4 +86,15 @@ public class ServiceThreadFactory implements ThreadFactory {
     return new Thread(r, threadName);
   }
 
+  /**
+   * Create a single thread executor using this naming policy
+   * @param name base thread name
+   * @param daemons flag to indicate the threads should be marked as daemons
+   * @return an executor
+   */
+  public static ExecutorService newSingleThreadExecutor(String name,
+      boolean daemons) {
+    return Executors.newSingleThreadExecutor(
+        new ServiceThreadFactory(name, daemons));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
index c9df6c5..e556422 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
@@ -18,6 +18,7 @@
 
 package org.apache.slider.server.services.workflow;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceStateChangeListener;
@@ -41,20 +42,28 @@ import java.util.List;
 public class WorkflowCompositeService extends CompositeService
     implements ServiceParent, ServiceStateChangeListener {
 
-  private static final Logger log =
+  private static final Logger LOG =
     LoggerFactory.getLogger(WorkflowCompositeService.class);
 
+  /**
+   * Construct an instance
+   * @param name name of this service instance
+   */
   public WorkflowCompositeService(String name) {
     super(name);
   }
 
 
+  /**
+   * Construct an instance with the default name.
+   */
   public WorkflowCompositeService() {
     this("WorkflowCompositeService");
   }
 
   /**
    * Varargs constructor
+   * @param name name of this service instance
    * @param children children
    */
   public WorkflowCompositeService(String name, Service... children) {
@@ -63,9 +72,11 @@ public class WorkflowCompositeService extends CompositeService
       addService(child);
     }
   }
+
   /**
-   * Varargs constructor
-   * @param children children
+   * Construct with a list of children
+   * @param name name of this service instance
+   * @param children children to add
    */
   public WorkflowCompositeService(String name, List<Service> children) {
     this(name);
@@ -82,6 +93,7 @@ public class WorkflowCompositeService extends CompositeService
    */
   @Override
   public synchronized void addService(Service service) {
+    Preconditions.checkNotNull(service, "null service argument");
     service.registerServiceListener(this);
     super.addService(service);
   }
@@ -100,7 +112,7 @@ public class WorkflowCompositeService extends CompositeService
       //did the child fail? if so: propagate
       Throwable failureCause = child.getFailureCause();
       if (failureCause != null) {
-        log.info("Child service " + child + " failed", failureCause);
+        LOG.info("Child service " + child + " failed", failureCause);
         //failure. Convert to an exception
         Exception e = (failureCause instanceof Exception) ?
             (Exception) failureCause : new Exception(failureCause);
@@ -108,15 +120,23 @@ public class WorkflowCompositeService extends CompositeService
         noteFailure(e);
         stop();
       } else {
-        log.info("Child service completed {}", child);
+        LOG.info("Child service completed {}", child);
         if (areAllChildrenStopped()) {
-          log.info("All children are halted: stopping");
+          LOG.info("All children are halted: stopping");
           stop();
         }
       }
     }
   }
 
+  /**
+   * Probe to query if all children are stopped -simply
+   * by taking a snapshot of the child service list and enumerating
+   * their state. 
+   * The state of the children may change during this operation -that will
+   * not get picked up.
+   * @return true if all the children are stopped.
+   */
   private boolean areAllChildrenStopped() {
     List<Service> children = getServices();
     boolean stopped = true;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
index dc51bac..a0f954e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
@@ -18,8 +18,12 @@
 
 package org.apache.slider.server.services.workflow;
 
+/**
+ * This is the callback triggered by the {@link WorkflowEventNotifyingService}
+ * when it generates a notification
+ */
 public interface WorkflowEventCallback {
   
-  public void eventCallbackEvent();
+  public void eventCallbackEvent(Object parameter);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
index a9244eb..d504d13 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -19,65 +19,94 @@
 package org.apache.slider.server.services.workflow;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.service.AbstractService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
 /**
  * A service that calls the supplied callback when it is started -after the 
  * given delay, then stops itself.
- * The notifications come in on a different thread
+ * The notifications come in on a callback thread -a thread that is only
+ * started in this service's <code>start()</code> operation.
  */
-public class WorkflowEventNotifyingService extends AbstractService implements Runnable {
-  protected static final Logger log =
+public class WorkflowEventNotifyingService extends WorkflowExecutorService
+    implements Runnable {
+  protected static final Logger LOG =
     LoggerFactory.getLogger(WorkflowEventNotifyingService.class);
   private final WorkflowEventCallback callback;
   private final int delay;
-  private ExecutorService executor;
+  private final ServiceTerminatingRunnable command;
+  private final Object parameter;
 
 
+  /**
+   * Create an instance of the service
+   * @param name service name
+   * @param callback callback to invoke
+   * @param parameter optional parameter for the callback
+   * @param delay delay -or 0 for no delay
+   */
   public WorkflowEventNotifyingService(String name,
-      WorkflowEventCallback callback, int delay) {
+      WorkflowEventCallback callback,
+      Object parameter,
+      int delay) {
     super(name);
     Preconditions.checkNotNull(callback, "Null callback argument");
     this.callback = callback;
     this.delay = delay;
+    this.parameter = parameter;
+    command = new ServiceTerminatingRunnable(this, this);
   }
 
-  public WorkflowEventNotifyingService(WorkflowEventCallback callback, int delay) {
-    this("WorkflowEventNotifyingService", callback, delay);
+  /**
+   * Create an instance of the service
+   * @param callback callback to invoke
+   * @param parameter optional parameter for the callback
+   * @param delay delay -or 0 for no delay
+   */
+  public WorkflowEventNotifyingService(WorkflowEventCallback callback,
+      Object parameter,
+      int delay) {
+    this("WorkflowEventNotifyingService", callback, parameter, delay);
   }
 
   @Override
   protected void serviceStart() throws Exception {
-    log.debug("Notifying {} after a delay of {} millis", callback, delay);
-    executor = Executors.newSingleThreadExecutor(
-        new ServiceThreadFactory(getName(), true));
-    executor.execute(this);
+    LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
+    setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+    execute(command);
   }
 
+  /**
+   * Stop the service.
+   * If there is any exception noted from any executed notification,
+   * note the exception in this class
+   * @throws Exception exception.
+   */
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();
-    if (executor != null) {
-      executor.shutdownNow();
+    // propagate any failure
+    if (command != null && command.getException() != null) {
+      noteFailure(command.getException());
     }
   }
 
+  /**
+   * Perform the work in a different thread. Relies on
+   * the {@link ServiceTerminatingRunnable} to trigger
+   * the service halt on this thread.
+   */
   @Override // Runnable
   public void run() {
     if (delay > 0) {
       try {
         Thread.sleep(delay);
-      } catch (InterruptedException interrupted) {
-        return;
+      } catch (InterruptedException e) {
+        LOG.debug("Interrupted: {} in runnable", e, e);
       }
     }
-    log.debug("Notifying {}", callback);
-    callback.eventCallbackEvent();
-    stop();
+    LOG.debug("Notifying {}", callback);
+    callback.eventCallbackEvent(parameter);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
new file mode 100644
index 0000000..a61ac05
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.AbstractService;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A service that hosts an executor -in shutdown it is stopped.
+ */
+public class WorkflowExecutorService extends AbstractService {
+
+  private ExecutorService executor;
+  
+  public WorkflowExecutorService(String name) {
+    super(name);
+  }
+
+  public ExecutorService getExecutor() {
+    return executor;
+  }
+
+  protected void setExecutor(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * Execute the runnable with the executor (which 
+   * must have been created already)
+   * @param runnable runnable to execute
+   */
+  protected void execute(Runnable runnable) {
+    executor.execute(runnable);
+  }
+
+  /**
+   * Stop the service: halt the executor. 
+   * @throws Exception exception.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    stopExecutor();
+  }
+
+  /**
+   * Stop the executor if it is not null.
+   * This uses {@link ExecutorService#shutdownNow()}
+   * and so does not block until they have completed.
+   */
+  protected void stopExecutor() {
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
index 946166b..c42e784 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
@@ -18,6 +18,7 @@
 
 package org.apache.slider.server.services.workflow;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.ServiceStateChangeListener;
@@ -31,15 +32,34 @@ import java.util.List;
 
 /**
  * This resembles the YARN CompositeService, except that it
- * starts one service after another: it's init & start operations
- * only work with one service
+ * starts one service after another
+ * 
+ * Workflow
+ * <ol>
+ *   <li>When the <code>WorkflowSequenceService</code> instance is
+ *   initialized, it only initializes itself.</li>
+ *   
+ *   <li>When the <code>WorkflowSequenceService</code> instance is
+ *   started, it initializes then starts the first of its children.
+ *   If there are no children, it immediately stops.</li>
+ *   
+ *   <li>When the active child stops, it did not fail, and the parent has not
+ *   stopped -then the next service is initialized and started. If there is no
+ *   remaining child the parent service stops.</li>
+ *   
+ *   <li>If the active child did fail, the parent service notes the exception
+ *   and stops -effectively propagating up the failure.
+ *   </li>
+ * </ol>
+ * 
+ * New service instances MAY be added to a running instance -but no guarantees
+ * can be made as to whether or not they will be run.
  */
 
 public class WorkflowSequenceService extends AbstractService implements
-    ServiceParent,
-                                                     ServiceStateChangeListener {
+    ServiceParent, ServiceStateChangeListener {
 
-  private static final Logger log =
+  private static final Logger LOG =
     LoggerFactory.getLogger(WorkflowSequenceService.class);
 
   /**
@@ -48,21 +68,29 @@ public class WorkflowSequenceService extends AbstractService implements
   private final List<Service> serviceList = new ArrayList<Service>();
 
   /**
-   * The current service.
+   * The currently active service.
    * Volatile -may change & so should be read into a 
    * local variable before working with
    */
-  private volatile Service currentService;
-  /*
+  private volatile Service activeService;
+
+  /**
   the previous service -the last one that finished. 
-  Null if one did not finish yet
+  null if one did not finish yet
    */
   private volatile Service previousService;
 
+  /**
+   * Construct an instance
+   * @param name service name
+   */
   public WorkflowSequenceService(String name) {
     super(name);
   }
 
+  /**
+   * Construct an instance with the default name
+   */
   public WorkflowSequenceService() {
     this("WorkflowSequenceService");
   }
@@ -70,11 +98,21 @@ public class WorkflowSequenceService extends AbstractService implements
   /**
    * Create a service sequence with the given list of services
    * @param name service name
-   * @param offspring initial sequence
+   * @param children initial sequence
    */
-  public WorkflowSequenceService(String name, Service... offspring) {
+  public WorkflowSequenceService(String name, Service... children) {
     super(name);
-    for (Service service : offspring) {
+    for (Service service : children) {
+      addService(service);
+    }
+  }  /**
+   * Create a service sequence with the given list of services
+   * @param name service name
+   * @param children initial sequence
+   */
+  public WorkflowSequenceService(String name, List<Service> children) {
+    super(name);
+    for (Service service : children) {
       addService(service);
     }
   }
@@ -83,10 +121,14 @@ public class WorkflowSequenceService extends AbstractService implements
    * Get the current service -which may be null
    * @return service running
    */
-  public Service getCurrentService() {
-    return currentService;
+  public Service getActiveService() {
+    return activeService;
   }
 
+  /**
+   * Get the previously active service
+   * @return the service last run, or null if there is none.
+   */
   public Service getPreviousService() {
     return previousService;
   }
@@ -97,16 +139,19 @@ public class WorkflowSequenceService extends AbstractService implements
    */
   @Override
   protected void serviceStart() throws Exception {
-    startNextService();
+    if (!startNextService()) {
+        //nothing to start -so stop
+        stop();
+    }
   }
 
   @Override
   protected void serviceStop() throws Exception {
     //stop current service.
     //this triggers a callback that is caught and ignored
-    Service current = currentService;
+    Service current = activeService;
     previousService = current;
-    currentService = null;
+    activeService = null;
     if (current != null) {
       current.stop();
     }
@@ -124,7 +169,7 @@ public class WorkflowSequenceService extends AbstractService implements
   public synchronized boolean startNextService() {
     if (isInState(STATE.STOPPED)) {
       //downgrade to a failed
-      log.debug("Not starting next service -{} is stopped", this);
+      LOG.debug("Not starting next service -{} is stopped", this);
       return false;
     }
     if (!isInState(STATE.STARTED)) {
@@ -136,10 +181,10 @@ public class WorkflowSequenceService extends AbstractService implements
       //nothing left to run
       return false;
     }
-    if (currentService != null && currentService.getFailureCause() != null) {
+    if (activeService != null && activeService.getFailureCause() != null) {
       //did the last service fail? Is this caused by some premature callback?
-      log.debug("Not starting next service due to a failure of {}",
-                currentService);
+      LOG.debug("Not starting next service due to a failure of {}",
+          activeService);
       return false;
     }
     //bear in mind that init & start can fail, which
@@ -148,7 +193,7 @@ public class WorkflowSequenceService extends AbstractService implements
     //the start-next-service logic is skipped.
     //now, what does that mean w.r.t exit states?
 
-    currentService = null;
+    activeService = null;
     Service head = serviceList.remove(0);
 
     try {
@@ -161,7 +206,7 @@ public class WorkflowSequenceService extends AbstractService implements
     }
     //at this point the service must have explicitly started & not failed,
     //else an exception would have been raised
-    currentService = head;
+    activeService = head;
     return true;
   }
 
@@ -175,7 +220,7 @@ public class WorkflowSequenceService extends AbstractService implements
   public void stateChanged(Service service) {
     // only react to the state change when it is the current service
     // and it has entered the STOPPED state
-    if (service == currentService && service.isInState(STATE.STOPPED)) {
+    if (service == activeService && service.isInState(STATE.STOPPED)) {
       onServiceCompleted(service);
     }
   }
@@ -185,8 +230,8 @@ public class WorkflowSequenceService extends AbstractService implements
    * @param service service that has completed
    */
   protected synchronized void onServiceCompleted(Service service) {
-    log.info("Running service stopped: {}", service);
-    previousService = currentService;
+    LOG.info("Running service stopped: {}", service);
+    previousService = activeService;
     
 
     //start the next service if we are not stopped ourselves
@@ -218,7 +263,7 @@ public class WorkflowSequenceService extends AbstractService implements
     } else {
       //not started, so just note that the current service
       //has gone away
-      currentService = null;
+      activeService = null;
     }
   }
 
@@ -227,9 +272,10 @@ public class WorkflowSequenceService extends AbstractService implements
    * {@link WorkflowSequenceService}
    * @param service the {@link Service} to be added
    */
-  @Override //Parent
+  @Override
   public synchronized void addService(Service service) {
-    log.debug("Adding service {} ", service.getName());
+    Preconditions.checkNotNull(service, "null service argument");
+    LOG.debug("Adding service {} ", service.getName());
     synchronized (serviceList) {
       serviceList.add(service);
     }
@@ -247,7 +293,7 @@ public class WorkflowSequenceService extends AbstractService implements
 
   @Override // Object
   public synchronized String toString() {
-    return super.toString() + "; current service " + currentService
+    return super.toString() + "; current service " + activeService
            + "; queued service count=" + serviceList.size();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
new file mode 100644
index 0000000..89275c8
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+
+ <h2>
+ Introduction
+ </h2>
+ This package contains classes which can be aggregated to build up
+ complex workflows of services: sequences of operations, callbacks
+ and composite services with a shared lifespan.
+ 
+ Core concepts:
+ <ol>
+ <li>
+ Workflow service instances have a limited lifespan, and will self-terminate when
+ they consider it time</li>
+ <li>
+ Workflow Services that have children implement the {@link org.apache.slider.server.services.workflow.ServiceParent}
+ class, which provides (thread-safe) access to the children -allowing new children
+ to be added, and existing children to be ennumerated
+ </li>
+ <li>
+ Workflow Services are designed to be aggregated, to be composed to produce larger
+ composite services which than perform ordered operations, notify other services
+ when work has completed, and to propagate failure up the service hierarchy.
+ </li>
+ <li>
+ Workflow Services may be subclassed to extend their behavior, or to use them
+ in specific applications. Just as the standard {@link org.apache.hadoop.service.CompositeService}
+ is often subclassed to aggregate child services, the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ can be used instead -adding the feature that failing services trigger automatic
+ parent shutdown. If that is the desired operational mode of a class,
+ swapping the composite service implementation may be sufficient to adopt it.
+ </li>
+ </ol>
+ 
+ <h2>
+ How do the workflow services differ from the standard <code>CompositeService</code>?
+ </h2>
+ 
+ The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ shares the same model of "child services, all inited and started together".
+ Where it differs is that if any child service stops -either due to a failure
+ or to an action which invokes that service's <code>stop()</code> method.
+ 
+ In contrast, the original <code>CompositeService</code> class starts its children
+ in its <code>start()</code> method, but does not listen or react to any
+ child service halting. As a result, changes in child state are not detected
+ or propagated.
+ 
+ If a child service runs until completed -that is it will not be stopped until
+ instructed to do so, and if it is only the parent service that attempts to
+ stop the child, then this difference is unimportant. 
+ 
+ However, if any service that depends upon all it child services running -
+ and if those child services are written so as to stop when they fail, using
+ the <code>WorkflowCompositeService</code> as a base class will enable the 
+ parent service to be automatically notified of a child stopping.
+ 
+ The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
+ resembles the composite service in API, but its workflow is different. It
+ initializes and starts its children one-by-one, only starting the second after
+ the first one succeeds, the third after the second, etc. If any service in
+ the sequence fails, the parent <code>WorkflowSequenceService</code> stops, 
+ reporting the same exception. 
+ 
+ 
+ <h2>
+ Other workflow services
+ </h2>
+ 
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowEventNotifyingService }:
+ Notifies callbacks when a workflow reaches a specific point (potentially after a delay).</li>
+ <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+ Executes a process when started, and binds to the life of that process. When the
+ process terminates, so does the service -and vice versa.</li>
+ <li>{@link }: </li>
+ <li>{@link }: </li>
+ </ul>
+
+Lower level classes 
+ <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowExecutorService }:
+ This is a base class for YARN services that use an {@link java.util.concurrent.ExecutorService}.
+ for managing asynchronous operations: it stops the executor when the service is
+ stopped.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+ Executes a process when started, and binds to the life of that process. When the
+ process terminates, so does the service -and vice versa.</li>
+ <li>{@link org.apache.slider.server.services.workflow.LongLivedProcess}:
+ The inner class used to managed the forked process. When called directly it
+ offers more features.</li>
+ <li>{@link org.apache.slider.server.services.workflow.ClosingService}:
+ A parameterized service to close the <code>Closeable</code> passed in -used for cleaning
+ up references.</li>
+ </ul>
+
+
+
+ */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
index 03cf998..d4f90a5 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
@@ -62,22 +62,12 @@ public class TestWorkflowCompositeService extends WorkflowServiceTestBase {
   }
 
   @Test
-  public void testOneLongLivedChild() throws Throwable {
-    MockService one = new MockService("one", false, 500);
-    MockService two = new MockService("two", false, 100);
-    ServiceParent parent = startService(one, two);
-    waitForParentToStop(parent);
-    assertStopped(one);
-    assertStopped(two);
-  }
-
-  @Test
   public void testNotificationChild() throws Throwable {
 
-    EventCallbackHandler ecb = new EventCallbackHandler();
     MockService one = new MockService("one", false, 100);
+    EventCallbackHandler ecb = new EventCallbackHandler();
     WorkflowEventNotifyingService ens =
-        new WorkflowEventNotifyingService(ecb, 100);
+        new WorkflowEventNotifyingService(ecb, "hello", 100);
     MockService two = new MockService("two", false, 100);
     ServiceParent parent = startService(one, ens, two);
     waitForParentToStop(parent);
@@ -85,6 +75,7 @@ public class TestWorkflowCompositeService extends WorkflowServiceTestBase {
     assertStopped(ens);
     assertStopped(two);
     assertTrue(ecb.notified);
+    assertEquals("hello", ecb.result);
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
index 986057a..80b70a2 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
@@ -28,22 +28,27 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
   private static final Logger
       log = LoggerFactory.getLogger(TestWorkflowSequenceService.class);
 
-
   @Test
   public void testSingleSequence() throws Throwable {
-    ServiceParent ss = startService(new MockService());
-    ss.stop();
+    ServiceParent parent = startService(new MockService());
+    parent.stop();
+  }
+
+  @Test
+  public void testEmptySequence() throws Throwable {
+    ServiceParent parent = startService();
+    waitForParentToStop(parent);
   }
 
   @Test
   public void testSequence() throws Throwable {
     MockService one = new MockService("one", false, 100);
     MockService two = new MockService("two", false, 100);
-    ServiceParent ss = startService(one, two);;
-    assert ss.waitForServiceToStop(1000);
-    assert one.isInState(Service.STATE.STOPPED);
-    assert two.isInState(Service.STATE.STOPPED);
-    assert ((WorkflowSequenceService)ss).getPreviousService().equals(two);
+    ServiceParent parent = startService(one, two);
+    waitForParentToStop(parent);
+    assertStopped(one);
+    assertStopped(two);
+    assert ((WorkflowSequenceService)parent).getPreviousService().equals(two);
   }
 
   @Test
@@ -51,14 +56,15 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
     EventCallbackHandler ecb = new EventCallbackHandler();
     MockService one = new MockService("one", false, 100);
     WorkflowEventNotifyingService ens =
-        new WorkflowEventNotifyingService(ecb, 100);
+        new WorkflowEventNotifyingService(ecb, 3, 100);
     MockService two = new MockService("two", false, 100);
-    ServiceParent ss = startService(one, ens, two);
-    assert ss.waitForServiceToStop(1000);
+    ServiceParent parent = startService(one, ens, two);
+    waitForParentToStop(parent);
     assertStopped(one);
     assertStopped(ens);
     assertStopped(two);
     assertTrue(ecb.notified);
+    assertEquals(3, ecb.result);
   }
 
   @Test
@@ -114,6 +120,25 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
     assertStopped(two);
   }
 
+
+  @Test
+  public void testAddChild() throws Throwable {
+    MockService one = new MockService("one", false, 5000);
+    MockService two = new MockService("two", false, 100);
+    ServiceParent parent = startService(one, two);
+    EventCallbackHandler ecb = new EventCallbackHandler();
+    WorkflowEventNotifyingService ens =
+        new WorkflowEventNotifyingService(ecb, "hello", 100);
+    parent.addService(ens);
+    waitForParentToStop(parent, 10000);
+    assertStopped(one);
+    assertStopped(two);
+    assertStopped(ens);
+    assertStopped(two);
+    assertTrue(ecb.notified);
+    assertEquals("hello", ecb.result);
+  }
+
   public WorkflowSequenceService buildService(Service... services) {
     WorkflowSequenceService parent =
         new WorkflowSequenceService("test", services);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index d8d87e4..657cc31 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -21,6 +21,8 @@ package org.apache.slider.server.services.workflow;
 import org.apache.hadoop.service.Service;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,12 +30,20 @@ public abstract class WorkflowServiceTestBase extends Assert {
   private static final Logger
       log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
 
+
+  /**
+   * Set the timeout for every test
+   */
+  @Rule
+  public Timeout testTimeout = new Timeout(15000);
+
   @Before
   public void nameThread() {
     Thread.currentThread().setName("JUnit");
   }
+
   
-  public void assertInState(Service service, Service.STATE expected) {
+  protected void assertInState(Service service, Service.STATE expected) {
     Service.STATE actual = service.getServiceState();
     if (actual != expected) {
       fail("Service " + service.getName() + " in state " + actual
@@ -41,18 +51,18 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
-  public void assertStopped(Service service) {
+  protected void assertStopped(Service service) {
     assertInState(service, Service.STATE.STOPPED);
   }
 
-  void logState(ServiceParent p) {
+  protected void logState(ServiceParent p) {
     logService(p);
     for (Service s : p.getServices()) {
       logService(s);
     }
   }
 
-  public void logService(Service s) {
+  protected void logService(Service s) {
     log.info(s.toString());
     Throwable failureCause = s.getFailureCause();
     if (failureCause != null) {
@@ -61,30 +71,48 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
-  public void waitForParentToStop(ServiceParent parent) {
-    boolean stop = parent.waitForServiceToStop(1000);
+  /**
+   * Wait a second for the service parent to stop
+   * @param parent the service to wait for
+   */
+  protected void waitForParentToStop(ServiceParent parent) {
+    waitForParentToStop(parent, 1000);
+  }
+
+  /**
+   * Wait for the service parent to stop
+   * @param parent the service to wait for
+   * @param timeout time in milliseconds
+   */
+  protected void waitForParentToStop(ServiceParent parent, int timeout) {
+    boolean stop = parent.waitForServiceToStop(timeout);
     if (!stop) {
       logState(parent);
-      fail("Service failed to stop :" + parent);
+      fail("Service failed to stop : after " + timeout +" millis " + parent);
     }
   }
 
-  public abstract ServiceParent buildService(Service... services);
+  protected abstract ServiceParent buildService(Service... services);
 
-  public ServiceParent startService(Service... services) {
+  protected ServiceParent startService(Service... services) {
     ServiceParent parent = buildService(services);
     //expect service to start and stay started
     parent.start();
     return parent;
   }
 
+  /**
+   * Class to log when an event callback happens
+   */
   public static class EventCallbackHandler implements WorkflowEventCallback {
     public volatile boolean notified = false;
+    public Object result;
 
     @Override
-    public void eventCallbackEvent() {
+    public void eventCallbackEvent(Object parameter) {
       log.info("EventCallback");
       notified = true;
+      result = parameter;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/2cdb4b74/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
index f653aa5..2210ff3 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
@@ -332,7 +332,8 @@ public class AccumuloProviderService extends AbstractProviderService implements
     //callback to AM to trigger cluster review is set up to happen after
     //the init/verify action has succeeded
     WorkflowEventNotifyingService notifier = new WorkflowEventNotifyingService(execInProgress,
-           internalOperations.getGlobalOptions().getOptionInt(
+        null,
+        internalOperations.getGlobalOptions().getOptionInt(
              OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
              OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY));
     // register the service for lifecycle management;