You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/03 17:30:46 UTC

[2/6] git commit: SLIDER-94 : move to Callable based event notification; drop event notifying service

SLIDER-94 : move to Callable based event notification; drop event notifying service


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/5192fc7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/5192fc7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/5192fc7a

Branch: refs/heads/develop
Commit: 5192fc7a93ff6b27c9e9705e6b81d8d52332943d
Parents: 6165828
Author: Steve Loughran <st...@apache.org>
Authored: Tue Jun 3 16:16:29 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Jun 3 16:17:29 2014 +0100

----------------------------------------------------------------------
 .../providers/AbstractProviderService.java      |   2 +-
 .../slider/providers/ProviderCompleted.java     |  29 ++++
 .../providers/ProviderCompletedCallable.java    |  38 +++++
 .../slider/providers/ProviderService.java       |   3 +-
 .../providers/agent/AgentProviderService.java   |   4 +-
 .../slideram/SliderAMProviderService.java       |   4 +-
 .../server/appmaster/SliderAppMaster.java       |  14 +-
 .../services/workflow/ForkedProcessService.java |   1 +
 .../workflow/ServiceTerminatingCallable.java    |  11 +-
 .../workflow/WorkflowCallbackService.java       |  24 ++-
 .../workflow/WorkflowEventCallback.java         |  31 ----
 .../workflow/WorkflowEventNotifyingService.java | 114 -------------
 .../server/services/workflow/package-info.java  | 158 +++++++++++--------
 .../model/mock/MockProviderService.groovy       |   4 +-
 .../apache/slider/test/SliderTestUtils.groovy   |   8 +-
 .../workflow/TestForkedProcessService.java      | 140 ----------------
 .../TestServiceTerminatingRunnable.java         |  64 --------
 .../workflow/TestWorkflowCompositeService.java  |   2 +-
 .../TestWorkflowForkedProcessService.java       | 140 ++++++++++++++++
 .../workflow/TestWorkflowSequenceService.java   |   4 +-
 .../TestWorkflowServiceTerminatingRunnable.java |  64 ++++++++
 .../workflow/WorkflowServiceTestBase.java       |  17 --
 .../accumulo/AccumuloProviderService.java       |  26 +--
 .../providers/hbase/HBaseProviderService.java   |   4 +-
 24 files changed, 428 insertions(+), 478 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
index da9f1d1..7c9b38e 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/AbstractProviderService.java
@@ -169,7 +169,7 @@ public abstract class AbstractProviderService
       }
     }
     ForkedProcessService lastProc = latestProcess();
-    if (lastProc == null) {
+    if (lastProc == null || !lastProc.isProcessTerminated()) {
       return 0;
     } else {
       return lastProc.getExitCode();

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
new file mode 100644
index 0000000..f6ff4fd
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompleted.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+/**
+ * This is the callback triggered by the {@link ProviderCompletedCallable}
+ * when it generates a notification
+ */
+public interface ProviderCompleted {
+  
+  public void eventCallbackEvent(Object parameter);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
new file mode 100644
index 0000000..47939c9
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderCompletedCallable.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.providers;
+
+import java.util.concurrent.Callable;
+
+public class ProviderCompletedCallable implements Callable<Object> {
+
+  private final ProviderCompleted callback;
+  private final Object parameter;
+
+  public ProviderCompletedCallable(ProviderCompleted callback, Object parameter) {
+    this.callback = callback;
+    this.parameter = parameter;
+  }
+
+  @Override
+  public Object call() throws Exception {
+    callback.eventCallbackEvent(parameter);
+    return parameter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
index ab09a8d..d77135c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/ProviderService.java
@@ -34,7 +34,6 @@ import org.apache.slider.core.registry.info.ServiceInstanceData;
 import org.apache.slider.server.appmaster.state.StateAccessForProviders;
 import org.apache.slider.server.appmaster.web.rest.agent.AgentRestOperations;
 import org.apache.slider.server.services.registry.RegistryViewForProviders;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
 
 import java.io.File;
 import java.io.IOException;
@@ -81,7 +80,7 @@ public interface ProviderService extends ProviderCore, Service,
   boolean exec(AggregateConf instanceDefinition,
                File confDir,
                Map<String, String> env,
-               WorkflowEventCallback execInProgress) throws IOException,
+               ProviderCompleted execInProgress) throws IOException,
       SliderException;
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
index 8948746..f62198c 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/agent/AgentProviderService.java
@@ -46,6 +46,7 @@ import org.apache.slider.core.registry.info.CustomRegistryConstants;
 import org.apache.slider.core.registry.info.RegisteredEndpoint;
 import org.apache.slider.core.registry.info.ServiceInstanceData;
 import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
 import org.apache.slider.providers.ProviderCore;
 import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderUtils;
@@ -66,7 +67,6 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register;
 import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
 import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
 import org.apache.slider.server.appmaster.web.rest.agent.StatusCommand;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -307,7 +307,7 @@ public class AgentProviderService extends AbstractProviderService implements
   public boolean exec(AggregateConf instanceDefinition,
                       File confDir,
                       Map<String, String> env,
-                      WorkflowEventCallback execInProgress) throws
+                      ProviderCompleted execInProgress) throws
       IOException,
       SliderException {
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
index 9667c8f..1610954 100644
--- a/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
+++ b/slider-core/src/main/java/org/apache/slider/providers/slideram/SliderAMProviderService.java
@@ -39,12 +39,12 @@ import org.apache.slider.core.registry.info.RegisteredEndpoint;
 import org.apache.slider.core.registry.info.RegistryView;
 import org.apache.slider.core.registry.info.ServiceInstanceData;
 import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
 import org.apache.slider.providers.ProviderCore;
 import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.agent.AgentKeys;
 import org.apache.slider.server.appmaster.PublishedArtifacts;
 import org.apache.slider.server.appmaster.web.rest.RestPaths;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
 
 import java.io.File;
 import java.io.IOException;
@@ -94,7 +94,7 @@ public class SliderAMProviderService extends AbstractProviderService implements
   public boolean exec(AggregateConf instanceDefinition,
       File confDir,
       Map<String, String> env,
-      WorkflowEventCallback execInProgress) throws IOException, SliderException {
+      ProviderCompleted execInProgress) throws IOException, SliderException {
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 2457b4d..e126261 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -88,6 +88,7 @@ import org.apache.slider.core.registry.info.CustomRegistryConstants;
 import org.apache.slider.core.registry.info.RegisteredEndpoint;
 import org.apache.slider.core.registry.info.RegistryNaming;
 import org.apache.slider.core.registry.info.ServiceInstanceData;
+import org.apache.slider.providers.ProviderCompleted;
 import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderService;
 import org.apache.slider.providers.SliderProviderFactory;
@@ -112,7 +113,6 @@ import org.apache.slider.server.appmaster.web.WebAppApiImpl;
 import org.apache.slider.server.appmaster.web.rest.RestPaths;
 import org.apache.slider.server.services.registry.SliderRegistryService;
 import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
 import org.apache.slider.server.services.workflow.WorkflowRpcService;
 import org.apache.slider.server.services.utility.WebAppService;
 import org.slf4j.Logger;
@@ -149,7 +149,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
     SliderClusterProtocol,
     ServiceStateChangeListener,
     RoleKeys,
-    WorkflowEventCallback,
+    ProviderCompleted,
     ContainerStartOperation {
   protected static final Logger log =
     LoggerFactory.getLogger(SliderAppMaster.class);
@@ -1321,7 +1321,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       // didn't start, so don't register
       providerService.start();
       // and send the started event ourselves
-      eventCallbackEvent(this, null, null);
+      eventCallbackEvent(null);
     }
   }
 
@@ -1330,10 +1330,8 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   /* EventCallback  from the child or ourselves directly */
   /* =================================================================== */
 
-  @Override // EventCallback
-  public void eventCallbackEvent(Object caller,
-      Object parameter,
-      Exception exception) {
+  @Override // ProviderCompleted
+  public void eventCallbackEvent(Object parameter) {
     // signalled that the child process is up.
     appState.noteAMLive();
     // now ask for the cluster nodes
@@ -1357,7 +1355,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
    */
   @Override //ServiceStateChangeListener
   public void stateChanged(Service service) {
-    if (service == providerService) {
+    if (service == providerService && service.isInState(STATE.STOPPED)) {
       //its the current master process in play
       int exitCode = providerService.getExitCode();
       int mappedProcessExitCode =

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index 141ab7d..900e722 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -250,6 +250,7 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
   public int getExitCode() {
     return process.getExitCode();
   }
+  
   public int getExitCodeSignCorrected() {
     return process.getExitCodeSignCorrected();
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
index 3ec1428..5ebf77c 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingCallable.java
@@ -38,9 +38,14 @@ public class ServiceTerminatingCallable<V> implements Callable<V> {
   private final Callable<V> callable;
 
 
+  /**
+   * Create an instance. If the owner is null, the owning service
+   * is not terminated.
+   * @param owner owning service -can be null
+   * @param callable callback.
+   */
   public ServiceTerminatingCallable(Service owner,
       Callable<V> callable) {
-    Preconditions.checkArgument(owner != null, "null owner");
     Preconditions.checkArgument(callable != null, "null callable");
     this.owner = owner;
     this.callable = callable;
@@ -79,7 +84,9 @@ public class ServiceTerminatingCallable<V> implements Callable<V> {
       exception = e;
       throw e;
     } finally {
-      owner.stop();
+      if (owner != null) {
+        owner.stop();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
index 5bf0803..6c50798 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCallbackService.java
@@ -30,7 +30,8 @@ import java.util.concurrent.TimeUnit;
 
 /**
  * A service that calls the supplied callback when it is started -after the 
- * given delay, then stops itself.
+ * given delay. It can be configured to stop itself after the callback has
+ * completed, marking any exception raised as the exception of this service.
  * The notifications come in on a callback thread -a thread that is only
  * started in this service's <code>start()</code> operation.
  */
@@ -53,15 +54,19 @@ public class WorkflowCallbackService<V> extends
    * @param name service name
    * @param callback callback to invoke
    * @param delay delay -or 0 for no delay
+   * @param terminate terminate this service after the callback?
    */
   public WorkflowCallbackService(String name,
       Callable<V> callback,
-      int delay) {
+      int delay,
+      boolean terminate) {
     super(name);
     Preconditions.checkNotNull(callback, "Null callback argument");
     this.callback = callback;
     this.delay = delay;
-    command = new ServiceTerminatingCallable<V>(this, callback);
+    command = new ServiceTerminatingCallable<V>(
+        terminate ? this : null,
+        callback);
   }
 
   public ScheduledFuture<V> getScheduledFuture() {
@@ -89,9 +94,18 @@ public class WorkflowCallbackService<V> extends
   protected void serviceStop() throws Exception {
     super.serviceStop();
     // propagate any failure
-    if (command.getException() != null) {
-      throw command.getException();
+    if (getCallbackException() != null) {
+      throw getCallbackException();
     }
   }
 
+  /**
+   * Get the exception raised by a callback. Will always be null if the 
+   * callback has not been executed; will only be non-null after any success.
+   * @return a callback
+   */
+  public Exception getCallbackException() {
+    return command.getException();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
deleted file mode 100644
index fe28e38..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-/**
- * This is the callback triggered by the {@link WorkflowEventNotifyingService}
- * when it generates a notification
- */
-public interface WorkflowEventCallback {
-  
-  public void eventCallbackEvent(Object caller,
-      Object parameter,
-      Exception exception);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
deleted file mode 100644
index 3c776d1..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import com.google.common.base.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-
-/**
- * A service that calls the supplied callback when it is started -after the 
- * given delay, then stops itself.
- * The notifications come in on a callback thread -a thread that is only
- * started in this service's <code>start()</code> operation.
- */
-public class WorkflowEventNotifyingService<V> extends
-    AbstractWorkflowExecutorService
-    implements Runnable {
-  protected static final Logger LOG =
-    LoggerFactory.getLogger(WorkflowEventNotifyingService.class);
-  private final WorkflowEventCallback callback;
-  private final int delay;
-  private final ServiceTerminatingRunnable command;
-  private final Object parameter;
-
-  /**
-   * Create an instance of the service
-   * @param name service name
-   * @param callback callback to invoke
-   * @param parameter optional parameter for the callback
-   * @param delay delay -or 0 for no delay
-   */
-  public WorkflowEventNotifyingService(String name,
-      WorkflowEventCallback callback,
-      Object parameter,
-      int delay) {
-    super(name);
-    Preconditions.checkNotNull(callback, "Null callback argument");
-    this.callback = callback;
-    this.delay = delay;
-    this.parameter = parameter;
-    this.command = new ServiceTerminatingRunnable(this, this);
-  }
-
-  /**
-   * Create an instance of the service
-   * @param callback callback to invoke
-   * @param parameter optional parameter for the callback
-   * @param delay delay -or 0 for no delay
-   */
-  public WorkflowEventNotifyingService(WorkflowEventCallback callback,
-      Object parameter,
-      int delay) {
-    this("WorkflowEventNotifyingService", callback, parameter, delay);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
-    setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
-    execute(command);
-  }
-
-  /**
-   * Stop the service.
-   * If there is any exception noted from any executed notification,
-   * note the exception in this class
-   * @throws Exception exception.
-   */
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-    // propagate any failure
-    if (command.getException() != null) {
-      noteFailure(command.getException());
-    }
-  }
-
-  /**
-   * Perform the work in a different thread. Relies on
-   * the {@link ServiceTerminatingRunnable} to trigger
-   * the service halt on this thread.
-   */
-  @Override // Runnable
-  public void run() {
-    if (delay > 0) {
-      try {
-        Thread.sleep(delay);
-      } catch (InterruptedException e) {
-        LOG.debug("Interrupted: {} in runnable", e, e);
-      }
-    }
-    LOG.debug("Notifying {}", callback);
-    callback.eventCallbackEvent(this, parameter, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
index 4dd2cc7..fab1b9f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -20,44 +20,49 @@ package org.apache.slider.server.services.workflow;
 
 /**
 
- <h2>
- Introduction
- </h2>
+<p>
  This package contains classes which can be aggregated to build up
  complex workflows of services: sequences of operations, callbacks
  and composite services with a shared lifespan.
+ </p>
 
+<h2>
  Core concepts:
- <ol>
- <li>
- Workflow service instances have a limited lifespan, and will self-terminate when
- they consider it time</li>
- <li>
- Workflow Services that have children implement the
- {@link org.apache.slider.server.services.workflow.ServiceParent}
- class, which provides (thread-safe) access to the children -allowing new children
- to be added, and existing children to be ennumerated
- </li>
- <li>
- Workflow Services are designed to be aggregated, to be composed to produce larger
- composite services which than perform ordered operations, notify other services
- when work has completed, and to propagate failure up the service hierarchy.
- </li>
- <li>
- Workflow Services may be subclassed to extend their behavior, or to use them
- in specific applications. Just as the standard
- {@link org.apache.hadoop.service.CompositeService}
- is often subclassed to aggregate child services, the
- {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
- can be used instead -adding the feature that failing services trigger automatic
- parent shutdown. If that is the desired operational mode of a class,
- swapping the composite service implementation may be sufficient to adopt it.
- </li>
- </ol>
+</h2>
+
 
- <h2>
- How do the workflow services differ from the standard YARN services?
- </h2>
+<p>
+The Workflow Services are set of Hadoop YARN services, all implementing
+the {@link org.apache.hadoop.service.Service} API.
+They are designed to be aggregated, to be composed to produce larger
+composite services which than perform ordered operations, notify other services
+when work has completed, and to propagate failure up the service hierarchy.
+</p>
+<p>
+Service instances may a limited lifespan, and may self-terminate when
+they consider it appropriate.</p>
+<p>
+Workflow Services that have children implement the
+{@link org.apache.slider.server.services.workflow.ServiceParent}
+class, which provides (thread-safe) access to the children -allowing new children
+to be added, and existing children to be ennumerated. The implement policies
+on how to react to the termination of children -so can sequence operations
+which terminate themselves when complete.
+</p>
+
+<p>
+Workflow Services may be subclassed to extend their behavior, or to use them
+in specific applications. Just as the standard
+{@link org.apache.hadoop.service.CompositeService}
+is often subclassed to aggregate child services, the
+{@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+can be used instead -adding the feature that failing services trigger automatic
+parent shutdown. If that is the desired operational mode of a class,
+swapping the composite service implementation may be sufficient to adopt it.
+</p>
+
+
+<h2> How do the workflow services differ from the standard YARN services? </h2>
 
  <p>
  
@@ -69,40 +74,55 @@ package org.apache.slider.server.services.workflow;
  Where it differs is that if any child service stops -either due to a failure
  or to an action which invokes that service's
  {@link org.apache.hadoop.service.Service#stop()} method.
- </p><p>
-
- In contrast, the original <code>CompositeService</code> class starts its children
- in its{@link org.apache.hadoop.service.Service#start()}  method, but does not listen or react to any
- child service halting. As a result, changes in child state are not detected
- or propagated.
-  </p><p>
-
- If a child service runs until completed -that is it will not be stopped until
- instructed to do so, and if it is only the parent service that attempts to
- stop the child, then this difference is unimportant. 
-  </p><p>
-
- However, if any service that depends upon all it child services running -
- and if those child services are written so as to stop when they fail, using
- the <code>WorkflowCompositeService</code> as a base class will enable the 
- parent service to be automatically notified of a child stopping.
-
- </p><p>
- The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
- resembles the composite service in API, but its workflow is different. It
- initializes and starts its children one-by-one, only starting the second after
- the first one succeeds, the third after the second, etc. If any service in
- the sequence fails, the parent <code>WorkflowSequenceService</code> stops, 
- reporting the same exception. 
  </p>
  <p>
- The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
- Executes a process when started, and binds to the life of that process. When the
- process terminates, so does the service -and vice versa. This service enables
- external processes to be executed as part of a sequence of operations -or,
- using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
- in parallel with other services, terminating the process when the other services
- stop -and vice versa.
+
+In contrast, the original <code>CompositeService</code> class starts its children
+in its{@link org.apache.hadoop.service.Service#start()}  method, but does not
+listen or react to any child service halting. As a result, changes in child 
+state are not automatically detected or propagated, other than failures in
+the actual init() and start() methods.
+</p>
+
+<p>
+If a child service runs until completed -that is it will not be stopped until
+instructed to do so, and if it is only the parent service that attempts to
+stop the child, then this difference is unimportant. 
+</p>
+<p>
+
+However, if any service that depends upon all it child services running -
+and if those child services are written so as to stop when they fail, using
+the <code>WorkflowCompositeService</code> as a base class will enable the 
+parent service to be automatically notified of a child stopping.
+
+</p>
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
+resembles the composite service in API, but its workflow is different. It
+initializes and starts its children one-by-one, only starting the second after
+the first one succeeds, the third after the second, etc. If any service in
+the sequence fails, the parent <code>WorkflowSequenceService</code> stops, 
+reporting the same exception. 
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+Executes a process when started, and binds to the life of that process. When the
+process terminates, so does the service -and vice versa. This service enables
+external processes to be executed as part of a sequence of operations -or,
+using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+in parallel with other services, terminating the process when the other services
+stop -and vice versa.
+</p>
+
+<p>
+The {@link org.apache.slider.server.services.workflow.WorkflowCallbackService}
+executes a {@link java.util.concurrent.Callable} callback a specified delay
+after the service is started, then potentially terminates itself.
+This is useful for callbacks when a workflow  reaches a specific point
+-or simply for executing arbitrary code in the workflow.
+
  </p>
 
 
@@ -110,18 +130,16 @@ package org.apache.slider.server.services.workflow;
 Other Workflow Services
 </h2>
 
- There are some minor services that have proven useful within aggregate workflows,
- and simply in applications which are built from composite YARN services.
- 
+There are some minor services that have proven useful within aggregate workflows,
+and simply in applications which are built from composite YARN services.
+
  <ul>
  <li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }:
  Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
  When the service is started, so is the RPC server. Similarly, when the service
  is stopped, so is the RPC server instance. 
  </li>
- <li>{@link org.apache.slider.server.services.workflow.WorkflowEventNotifyingService }:
- Notifies callbacks when a workflow reaches a specific point (potentially after a delay).
- </li>
+
  <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
  an instance of {@link java.io.Closeable} when the service is stopped. This
  is purely a housekeeping class.

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
index e82deb0..361fc2e 100644
--- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/mock/MockProviderService.groovy
@@ -42,7 +42,7 @@ import org.apache.slider.server.appmaster.web.rest.agent.Register
 import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse
 import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus
 import org.apache.slider.server.services.registry.RegistryViewForProviders
-import org.apache.slider.server.services.workflow.WorkflowEventCallback
+import org.apache.slider.providers.ProviderCompleted
 
 class MockProviderService implements ProviderService {
 
@@ -145,7 +145,7 @@ class MockProviderService implements ProviderService {
       AggregateConf instanceDefinition,
       File confDir,
       Map<String, String> env,
-      WorkflowEventCallback execInProgress) throws IOException, SliderException {
+      ProviderCompleted execInProgress) throws IOException, SliderException {
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
----------------------------------------------------------------------
diff --git a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
index 2045f11..cb95fd2 100644
--- a/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
+++ b/slider-core/src/test/groovy/org/apache/slider/test/SliderTestUtils.groovy
@@ -481,8 +481,8 @@ class SliderTestUtils extends Assert {
     ServiceLauncher<SliderClient> serviceLauncher =
         new ServiceLauncher<SliderClient>(SliderClient.name);
     serviceLauncher.launchService(conf,
-                                  toArray(args),
-                                  false);
+        toArray(args),
+        false, true);
     return serviceLauncher
   }
 
@@ -493,8 +493,8 @@ class SliderTestUtils extends Assert {
     ServiceLauncher serviceLauncher =
         new ServiceLauncher(serviceClass.name);
     serviceLauncher.launchService(conf,
-                                  toArray(args),
-                                  false);
+        toArray(args),
+        false, true);
     return serviceLauncher;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
deleted file mode 100644
index 1ac89a5..0000000
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.ServiceOperations;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Test the long lived process by executing a command that works and a command
- * that fails
- */
-public class TestForkedProcessService extends WorkflowServiceTestBase {
-  private static final Logger
-      log = LoggerFactory.getLogger(TestForkedProcessService.class);
-
-  private static final Logger
-      processLog =
-      LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
-
-
-  private ForkedProcessService process;
-  private File testDir = new File("target");
-  private ProcessCommandFactory commandFactory;
-  private Map<String, String> env = new HashMap<String, String>();
-
-  @Before
-  public void setupProcesses() {
-    commandFactory = ProcessCommandFactory.createProcessCommandFactory();
-  }
-
-  @After
-  public void stopProcesses() {
-    ServiceOperations.stop(process);
-  }
-
-  @Test
-  public void testLs() throws Throwable {
-
-    initProcess(commandFactory.ls(testDir));
-    exec();
-    assertFalse(process.isProcessRunning());
-    assertEquals(0, process.getExitCode());
-
-    assertStringInOutput("test-classes", getFinalOutput());
-    // assert that the service did not fail
-    assertNull(process.getFailureCause());
-  }
-
-  @Test
-  public void testExitCodes() throws Throwable {
-
-    initProcess(commandFactory.exitFalse());
-    exec();
-    assertFalse(process.isProcessRunning());
-    int exitCode = process.getExitCode();
-    assertTrue(exitCode != 0);
-    int corrected = process.getExitCodeSignCorrected();
-    assertEquals(1, corrected);
-    // assert that the exit code was uprated to a service failure
-    assertNotNull(process.getFailureCause());
-
-  }
-
-  @Test
-  public void testEcho() throws Throwable {
-
-    String echoText = "hello, world";
-    initProcess(commandFactory.echo(echoText));
-    exec();
-
-    assertEquals(0, process.getExitCode());
-    assertStringInOutput(echoText, getFinalOutput());
-
-  }
-
-  @Test
-  public void testSetenv() throws Throwable {
-
-    String var = "TEST_RUN";
-    String val = "TEST-RUN-ENV-VALUE";
-    env.put(var, val);
-    initProcess(commandFactory.env());
-    exec();
-
-    assertEquals(0, process.getExitCode());
-    assertStringInOutput(val, getFinalOutput());
-  }
-
-  /**
-   * Get the final output. includes a quick sleep for the tail output
-   * @return the last output
-   */
-  private List<String> getFinalOutput() {
-    return process.getRecentOutput();
-  }
-
-  private ForkedProcessService initProcess(List<String> commands) throws
-      IOException {
-    process = new ForkedProcessService(name.getMethodName(), env,
-        commands);
-    process.init(new Configuration());
-
-    return process;
-  }
-
-  public void exec() throws InterruptedException {
-    assertNotNull(process);
-    EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
-    process.start();
-    waiter.waitForServiceToStop(5000);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
deleted file mode 100644
index ffedd6e..0000000
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.junit.Test;
-
-
-public class TestServiceTerminatingRunnable extends WorkflowServiceTestBase {
-
-  @Test
-  public void testNoservice() throws Throwable {
-
-    try {
-      new ServiceTerminatingRunnable(null, new SimpleRunnable());
-      fail("unexpected ");
-    } catch (IllegalArgumentException e) {
-
-      // expected 
-    }
-  }
-
-
-  @Test
-  public void testBasicRun() throws Throwable {
-
-    WorkflowCompositeService svc = run(new WorkflowCompositeService());
-    ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
-        new SimpleRunnable());
-
-    // synchronous in-thread execution
-    runnable.run();
-    assertStopped(svc);
-  }
-
-  @Test
-  public void testFailureRun() throws Throwable {
-
-    WorkflowCompositeService svc = run(new WorkflowCompositeService());
-    ServiceTerminatingRunnable runnable =
-        new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
-
-    // synchronous in-thread execution
-    runnable.run();
-    assertStopped(svc);
-    assertNotNull(runnable.getException());
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
index afd5a61..5780149 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
@@ -66,7 +66,7 @@ public class TestWorkflowCompositeService extends ParentWorkflowTestBase {
     MockService one = new MockService("one", false, 100);
     CallableHandler handler = new CallableHandler("hello");
     WorkflowCallbackService<String> ens =
-        new WorkflowCallbackService<String>("handler", handler, 100);
+        new WorkflowCallbackService<String>("handler", handler, 100, true);
     MockService two = new MockService("two", false, 100);
     ServiceParent parent = startService(one, ens, two);
     waitForParentToStop(parent);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
new file mode 100644
index 0000000..29d5578
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the long lived process by executing a command that works and a command
+ * that fails
+ */
+public class TestWorkflowForkedProcessService extends WorkflowServiceTestBase {
+  private static final Logger
+      log = LoggerFactory.getLogger(TestWorkflowForkedProcessService.class);
+
+  private static final Logger
+      processLog =
+      LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+
+
+  private ForkedProcessService process;
+  private File testDir = new File("target");
+  private ProcessCommandFactory commandFactory;
+  private Map<String, String> env = new HashMap<String, String>();
+
+  @Before
+  public void setupProcesses() {
+    commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+  }
+
+  @After
+  public void stopProcesses() {
+    ServiceOperations.stop(process);
+  }
+
+  @Test
+  public void testLs() throws Throwable {
+
+    initProcess(commandFactory.ls(testDir));
+    exec();
+    assertFalse(process.isProcessRunning());
+    assertEquals(0, process.getExitCode());
+
+    assertStringInOutput("test-classes", getFinalOutput());
+    // assert that the service did not fail
+    assertNull(process.getFailureCause());
+  }
+
+  @Test
+  public void testExitCodes() throws Throwable {
+
+    initProcess(commandFactory.exitFalse());
+    exec();
+    assertFalse(process.isProcessRunning());
+    int exitCode = process.getExitCode();
+    assertTrue(exitCode != 0);
+    int corrected = process.getExitCodeSignCorrected();
+    assertEquals(1, corrected);
+    // assert that the exit code was uprated to a service failure
+    assertNotNull(process.getFailureCause());
+
+  }
+
+  @Test
+  public void testEcho() throws Throwable {
+
+    String echoText = "hello, world";
+    initProcess(commandFactory.echo(echoText));
+    exec();
+
+    assertEquals(0, process.getExitCode());
+    assertStringInOutput(echoText, getFinalOutput());
+
+  }
+
+  @Test
+  public void testSetenv() throws Throwable {
+
+    String var = "TEST_RUN";
+    String val = "TEST-RUN-ENV-VALUE";
+    env.put(var, val);
+    initProcess(commandFactory.env());
+    exec();
+
+    assertEquals(0, process.getExitCode());
+    assertStringInOutput(val, getFinalOutput());
+  }
+
+  /**
+   * Get the final output. includes a quick sleep for the tail output
+   * @return the last output
+   */
+  private List<String> getFinalOutput() {
+    return process.getRecentOutput();
+  }
+
+  private ForkedProcessService initProcess(List<String> commands) throws
+      IOException {
+    process = new ForkedProcessService(name.getMethodName(), env,
+        commands);
+    process.init(new Configuration());
+
+    return process;
+  }
+
+  public void exec() throws InterruptedException {
+    assertNotNull(process);
+    EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
+    process.start();
+    waiter.waitForServiceToStop(5000);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
index a581bc9..581e3ed 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
@@ -57,7 +57,7 @@ public class TestWorkflowSequenceService extends ParentWorkflowTestBase {
     MockService one = new MockService("one", false, 100);
     CallableHandler handler = new CallableHandler("hello");
     WorkflowCallbackService<String> ens =
-        new WorkflowCallbackService<String>("handler", handler, 100);
+        new WorkflowCallbackService<String>("handler", handler, 100, true);
     MockService two = new MockService("two", false, 100);
     ServiceParent parent = startService(one, ens, two);
     waitForParentToStop(parent);
@@ -131,7 +131,7 @@ public class TestWorkflowSequenceService extends ParentWorkflowTestBase {
     ServiceParent parent = startService(one, two);
     CallableHandler handler = new CallableHandler("hello");
     WorkflowCallbackService<String> ens =
-        new WorkflowCallbackService<String>("handler", handler, 100);
+        new WorkflowCallbackService<String>("handler", handler, 100, true);
     parent.addService(ens);
     waitForParentToStop(parent, 10000);
     assertStopped(one);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java
new file mode 100644
index 0000000..15be1dc
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowServiceTerminatingRunnable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.junit.Test;
+
+
+public class TestWorkflowServiceTerminatingRunnable extends WorkflowServiceTestBase {
+
+  @Test
+  public void testNoservice() throws Throwable {
+
+    try {
+      new ServiceTerminatingRunnable(null, new SimpleRunnable());
+      fail("unexpected ");
+    } catch (IllegalArgumentException e) {
+
+      // expected 
+    }
+  }
+
+
+  @Test
+  public void testBasicRun() throws Throwable {
+
+    WorkflowCompositeService svc = run(new WorkflowCompositeService());
+    ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
+        new SimpleRunnable());
+
+    // synchronous in-thread execution
+    runnable.run();
+    assertStopped(svc);
+  }
+
+  @Test
+  public void testFailureRun() throws Throwable {
+
+    WorkflowCompositeService svc = run(new WorkflowCompositeService());
+    ServiceTerminatingRunnable runnable =
+        new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
+
+    // synchronous in-thread execution
+    runnable.run();
+    assertStopped(svc);
+    assertNotNull(runnable.getException());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index 8adaa1d..3049d8f 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -93,23 +93,6 @@ public abstract class WorkflowServiceTestBase extends Assert {
   }
 
   /**
-   * Class to log when an event callback happens
-   */
-  public static class EventCallbackHandler implements WorkflowEventCallback {
-    public volatile boolean notified = false;
-    public Object result;
-
-    @Override
-    public void eventCallbackEvent(Object caller,
-        Object parameter,
-        Exception exception) {
-      log.info("EventCallback");
-      notified = true;
-      result = parameter;
-    }
-  }
-
-  /**
    * Handler for callable events
    */
   public static class CallableHandler implements Callable<String> {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
index b43751d..df927a9 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.slider.common.SliderKeys;
@@ -39,6 +40,8 @@ import org.apache.slider.core.exceptions.BadCommandArgumentsException;
 import org.apache.slider.core.exceptions.BadConfigException;
 import org.apache.slider.core.exceptions.SliderException;
 import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
+import org.apache.slider.providers.ProviderCompletedCallable;
 import org.apache.slider.providers.ProviderCore;
 import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderUtils;
@@ -46,9 +49,9 @@ import org.apache.slider.common.tools.SliderFileSystem;
 import org.apache.slider.common.tools.SliderUtils;
 import org.apache.slider.core.zk.BlockingZKWatcher;
 import org.apache.slider.common.tools.ConfigHelper;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
-import org.apache.slider.server.services.workflow.WorkflowEventNotifyingService;
+import org.apache.slider.server.services.utility.WorkflowEventNotifyingService;
 import org.apache.slider.server.services.workflow.ForkedProcessService;
+import org.apache.slider.server.services.workflow.WorkflowCallbackService;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
@@ -264,7 +267,7 @@ public class AccumuloProviderService extends AbstractProviderService implements
   public boolean exec(AggregateConf instanceDefinition,
                       File confDir,
                       Map<String, String> env,
-                      WorkflowEventCallback execInProgress)
+                      ProviderCompleted execInProgress)
       throws IOException, SliderException {
 
 
@@ -330,12 +333,17 @@ public class AccumuloProviderService extends AbstractProviderService implements
     
     //callback to AM to trigger cluster review is set up to happen after
     //the init/verify action has succeeded
-    WorkflowEventNotifyingService notifier = new WorkflowEventNotifyingService(
-        execInProgress,
-        null,
-        internalOperations.getGlobalOptions().getOptionInt(
-             OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
-             OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY));
+    int delay = internalOperations.getGlobalOptions().getOptionInt(
+        OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,
+        OptionKeys.DEFAULT_CONTAINER_STARTUP_DELAY);
+    ProviderCompletedCallable completedCallable =
+        new ProviderCompletedCallable(execInProgress, null);
+    Service notifier = new WorkflowCallbackService<>(
+        "accumulo notifier",
+        completedCallable,
+        delay,
+        true);
+    
     // register the service for lifecycle management; 
     // this service is started after the accumulo process completes
     addService(notifier);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/5192fc7a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
index be68e4a..407f39a 100644
--- a/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
+++ b/slider-providers/hbase/slider-hbase-provider/src/main/java/org/apache/slider/providers/hbase/HBaseProviderService.java
@@ -38,6 +38,7 @@ import org.apache.slider.core.registry.docstore.PublishedConfigSet;
 import org.apache.slider.core.registry.docstore.PublishedConfiguration;
 import org.apache.slider.core.registry.info.ServiceInstanceData;
 import org.apache.slider.providers.AbstractProviderService;
+import org.apache.slider.providers.ProviderCompleted;
 import org.apache.slider.providers.ProviderCore;
 import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderUtils;
@@ -50,7 +51,6 @@ import org.apache.slider.server.appmaster.web.rest.agent.HeartBeatResponse;
 import org.apache.slider.server.appmaster.web.rest.agent.Register;
 import org.apache.slider.server.appmaster.web.rest.agent.RegistrationResponse;
 import org.apache.slider.server.appmaster.web.rest.agent.RegistrationStatus;
-import org.apache.slider.server.services.workflow.WorkflowEventCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,7 +240,7 @@ public class HBaseProviderService extends AbstractProviderService implements
   public boolean exec(AggregateConf instanceDefinition,
                       File confDir,
                       Map<String, String> env,
-                      WorkflowEventCallback execInProgress) throws
+                      ProviderCompleted execInProgress) throws
                                                  IOException,
       SliderException {