You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/02 20:11:52 UTC

[1/8] git commit: SLIDER-94 Closeable service tests and improvements

Repository: incubator-slider
Updated Branches:
  refs/heads/develop 3954ce3b6 -> 703597261


SLIDER-94 Closeable service tests and improvements


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/c8e09362
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/c8e09362
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/c8e09362

Branch: refs/heads/develop
Commit: c8e09362c671fba95e4c450aae2d385d3afc9954
Parents: 5f3b8a6
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 10:35:50 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 10:35:50 2014 +0100

----------------------------------------------------------------------
 .../services/workflow/ClosingService.java       |  27 ++--
 .../services/workflow/TestCloseableService.java | 122 +++++++++++++++++++
 2 files changed, 142 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c8e09362/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
index 7c9054c..6751347 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -18,10 +18,11 @@
 
 package org.apache.slider.server.services.workflow;
 
-import org.apache.hadoop.io.IOUtils;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.service.AbstractService;
 
 import java.io.Closeable;
+import java.io.IOException;
 
 /**
  * Service that closes the closeable supplied during shutdown, if not null.
@@ -32,12 +33,17 @@ public class ClosingService<C extends Closeable> extends AbstractService {
 
 
   public ClosingService(String name,
-                        C closeable) {
+      C closeable) {
     super(name);
     this.closeable = closeable;
   }
 
-  public Closeable getCloseable() {
+  public ClosingService(C closeable) {
+    this("ClosingService", closeable);
+  }
+
+
+  public C getCloseable() {
     return closeable;
   }
 
@@ -48,11 +54,18 @@ public class ClosingService<C extends Closeable> extends AbstractService {
   /**
    * Stop routine will close the closeable -if not null - and set the
    * reference to null afterwards
-   * @throws Exception
+   * This operation does raise any exception on the close, though it does
+   * record it
    */
   @Override
-  protected void serviceStop() throws Exception {
-    IOUtils.closeStream(closeable);
-    closeable = null;
+  protected void serviceStop() {
+    if (closeable != null) {
+      try {
+        closeable.close();
+      } catch (IOException ioe) {
+        noteFailure(ioe);
+      }
+      closeable = null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/c8e09362/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
new file mode 100644
index 0000000..cb984a9
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class TestCloseableService extends WorkflowServiceTestBase {
+
+  @Override
+  protected ServiceParent buildService(Service... services) {
+    throw new AssertionError("Unimplemented");
+  }
+
+  @Test
+  public void testSimpleClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    assertFalse(openClose.closed);
+    svc.stop();
+    assertTrue(openClose.closed);
+  }
+
+  @Test
+  public void testNullClose() throws Throwable {
+    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(null);
+    svc.init(new Configuration());
+    svc.start();
+    assertNull(svc.getCloseable());
+    svc.stop();
+  }
+
+  @Test
+  public void testFailingClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    openClose.raiseExceptionOnClose = true;
+    svc.stop();
+    assertTrue(openClose.closed);
+    Throwable cause = svc.getFailureCause();
+    assertNotNull(cause);
+
+    //retry should be a no-op
+    svc.close();
+  }
+
+  @Test
+  public void testDoubleClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    openClose.raiseExceptionOnClose = true;
+    svc.stop();
+    assertTrue(openClose.closed);
+    Throwable cause = svc.getFailureCause();
+    assertNotNull(cause);
+    openClose.closed = false;
+    svc.stop();
+    assertEquals(cause, svc.getFailureCause());
+  }
+
+  /**
+   * This does not recurse forever, as the service has already entered the
+   * STOPPED state before the inner close tries to stop it -that operation
+   * is a no-op
+   * @throws Throwable
+   */
+  @Test
+  public void testCloseSelf() throws Throwable {
+    ClosingService<ClosingService> svc =
+        new ClosingService<ClosingService>(null);
+    svc.setCloseable(svc);
+    svc.stop();
+  }
+
+
+  private ClosingService<OpenClose> instance(boolean raiseExceptionOnClose) {
+    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(new OpenClose(
+        raiseExceptionOnClose));
+    svc.init(new Configuration());
+    svc.start();
+    return svc;
+  }
+
+  private static class OpenClose implements Closeable {
+    public boolean closed = false;
+    public boolean raiseExceptionOnClose;
+
+    private OpenClose(boolean raiseExceptionOnClose) {
+      this.raiseExceptionOnClose = raiseExceptionOnClose;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        closed = true;
+        if (raiseExceptionOnClose) {
+          throw new IOException("OpenClose");
+        }
+      }
+    }
+  }
+}


[8/8] git commit: Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-slider into develop

Posted by st...@apache.org.
Merge branch 'develop' of https://git-wip-us.apache.org/repos/asf/incubator-slider into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/70359726
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/70359726
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/70359726

Branch: refs/heads/develop
Commit: 703597261f99cc1270906502dfab8f1acb8fc310
Parents: dd48a8a 3954ce3
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 19:11:41 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 19:11:41 2014 +0100

----------------------------------------------------------------------
 pom.xml                                         | 307 +++++++------------
 slider-assembly/pom.xml                         |   1 -
 slider-core/pom.xml                             | 122 +++-----
 .../server/appmaster/web/SliderAMWebApp.java    |   3 +-
 .../agent/AgentMiniClusterTestBase.groovy       |   2 -
 .../providers/agent/AgentTestUtils.groovy       |  13 +-
 .../slider/test/YarnMiniClusterTestBase.groovy  |   8 +-
 .../appmaster/web/TestSliderAmFilter.java       |  14 +-
 .../web/rest/agent/TestAMAgentWebServices.java  |  20 +-
 .../management/TestAMManagementWebServices.java |  65 ++--
 slider-funtest/pom.xml                          |  27 +-
 .../accumulo/accumulo-funtests/pom.xml          |  15 +-
 .../accumulo/slider-accumulo-provider/pom.xml   |  44 +--
 slider-providers/hbase/hbase-funtests/pom.xml   |  32 +-
 .../hbase/slider-hbase-provider/pom.xml         |  22 +-
 .../failures/TestKilledHBaseAM.groovy           |   6 +-
 16 files changed, 290 insertions(+), 411 deletions(-)
----------------------------------------------------------------------



[3/8] git commit: SLIDER-94: tests for LongLivedProcess -caught a regression in the switch to ExecutorService based execution

Posted by st...@apache.org.
SLIDER-94: tests for LongLivedProcess -caught a regression in the switch to ExecutorService based execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/0f1ab84a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/0f1ab84a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/0f1ab84a

Branch: refs/heads/develop
Commit: 0f1ab84a596ffa0192236399267002d38b9e2f84
Parents: bd20dc6
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 13:47:55 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 13:47:55 2014 +0100

----------------------------------------------------------------------
 .../services/workflow/ForkedProcessService.java |   2 +-
 .../services/workflow/LongLivedProcess.java     |  15 +-
 .../workflow/ProcessCommandFactory.java         |  75 +++++++++
 .../services/workflow/TestLongLivedProcess.java | 164 +++++++++++++++++++
 .../workflow/WorkflowServiceTestBase.java       |   5 +-
 5 files changed, 252 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0f1ab84a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index c3744f8..a5c042a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -71,7 +71,7 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
       throw new ServiceStateException("Process not yet configured");
     }
     //now spawn the process -expect updates via callbacks
-    process.spawnApplication();
+    process.start();
   }
 
   @Override //AbstractService

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0f1ab84a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 8293255..9efbe9f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -105,7 +105,6 @@ public class LongLivedProcess implements Runnable {
    * @param lifecycleCallback callback to notify on application exit
    */
   public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
-    Preconditions.checkArgument(lifecycleCallback != null, "lifecycleCallback");
     this.lifecycleCallback = lifecycleCallback;
   }
 
@@ -114,7 +113,7 @@ public class LongLivedProcess implements Runnable {
    * @param envVar envVar -must not be null
    * @param val value 
    */
-  public void putEnv(String envVar, String val) {
+  public void setEnv(String envVar, String val) {
     Preconditions.checkArgument(envVar != null, "envVar");
     Preconditions.checkArgument(val != null, "val");
     processBuilder.environment().put(envVar, val);
@@ -130,7 +129,7 @@ public class LongLivedProcess implements Runnable {
     for (Map.Entry<String, String> entry : map.entrySet()) {
       String val = entry.getValue();
       String key = entry.getKey();
-      putEnv(key, val);
+      setEnv(key, val);
     }
   }
 
@@ -210,7 +209,7 @@ public class LongLivedProcess implements Runnable {
    * Dump the environment to a string builder
    * @param buffer the buffer to append to
    */
-  private void dumpEnv(StringBuilder buffer) {
+  public void dumpEnv(StringBuilder buffer) {
     buffer.append("\nEnvironment\n-----------");
     Map<String, String> env = processBuilder.environment();
     Set<String> keys = env.keySet();
@@ -261,8 +260,10 @@ public class LongLivedProcess implements Runnable {
       if (lifecycleCallback != null) {
         lifecycleCallback.onProcessExited(this, exitCode);
       }
+      // shut down the threads
+      logExecutor.shutdown();
       try {
-        logExecutor.awaitTermination(60, TimeUnit.MINUTES);
+        logExecutor.awaitTermination(60, TimeUnit.SECONDS);
       } catch (InterruptedException ignored) {
         //ignored
       }
@@ -273,7 +274,7 @@ public class LongLivedProcess implements Runnable {
    * Spawn the application
    * @throws IOException IO problems
    */
-  public void spawnApplication() throws IOException {
+  public void start() throws IOException {
 
     spawnChildProcess();
     processExecutor.submit(this);
@@ -406,7 +407,7 @@ public class LongLivedProcess implements Runnable {
             outLine.setLength(0);
             processed |= true;
           }
-          if (!processed) {
+          if (!processed && !finished) {
             //nothing processed: wait a bit for data.
             try {
               Thread.sleep(sleepTime);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0f1ab84a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
new file mode 100644
index 0000000..8521d0d
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A source of commands, with the goal being to allow for adding different
+ * implementations for different platforms
+ */
+public class ProcessCommandFactory {
+
+  protected ProcessCommandFactory() {
+  }
+
+  /**
+   * The command to list a directory
+   * @param dir directory
+   * @return commands
+   */
+  public List<String> ls(File dir) {
+    List<String> commands = new ArrayList<String>(5);
+    commands.add("ls");
+    commands.add("-1");
+    commands.add(dir.getAbsolutePath());
+    return commands;
+  }
+
+  /**
+   * Echo some text to stdout
+   * @param text text
+   * @return commands
+   */
+  public List<String> echo(String text) {
+    List<String> commands = new ArrayList<String>(5);
+    commands.add("echo");
+    commands.add(text);
+    return commands;
+  }
+  /**
+   * print env variables
+   * @return commands
+   */
+  public List<String> env() {
+    List<String> commands = new ArrayList<String>(1);
+    commands.add("env");
+    return commands;
+  }
+
+  /**
+   * Create a process command factory for this OS
+   * @return
+   */
+  public static ProcessCommandFactory createProcessCommandFactory() {
+    return new ProcessCommandFactory();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0f1ab84a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
new file mode 100644
index 0000000..ab37e6a
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * Test the long lived process by executing a command that works and a command
+ * that fails
+ */
+public class TestLongLivedProcess extends WorkflowServiceTestBase implements
+    LongLivedProcessLifecycleEvent {
+  private static final Logger
+      log = LoggerFactory.getLogger(TestLongLivedProcess.class);
+
+  private static final Logger
+      processLog =
+      LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+
+
+  private LongLivedProcess process;
+  private File testDir = new File("target");
+  private ProcessCommandFactory commandFactory;
+  private volatile boolean started, stopped;
+  private volatile int exitCode;
+
+  @Before
+  public void setupProcesses() {
+    commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+  }
+
+  @After
+  public void stopProcesses() {
+    if (process != null) {
+      process.stop();
+    }
+  }
+
+  @Test
+  public void testLs() throws Throwable {
+
+    initProcess(commandFactory.ls(testDir));
+    process.start();
+    //in-thread wait
+    process.run();
+
+    //here stopped
+    assertTrue("process start callback not received", started);
+    assertTrue("process stop callback not received", stopped);
+    assertEquals(0, process.getExitCode().intValue());
+
+    assertStringInOutput("test-classes", getFinalOutput());
+  }
+
+  @Test
+  public void testEcho() throws Throwable {
+
+    String echoText = "hello, world";
+    initProcess(commandFactory.echo(echoText));
+    process.start();
+    //in-thread wait
+    process.run();
+
+    //here stopped
+    assertTrue("process stop callback not received", stopped);
+    assertEquals(0, process.getExitCode().intValue());
+    assertStringInOutput(echoText, getFinalOutput());
+  }
+
+  @Test
+  public void testSetenv() throws Throwable {
+
+    String var = "TEST_RUN";
+    String val = "TEST-RUN-ENV-VALUE";
+    String echoText = "${TEST_RUN}";
+    initProcess(commandFactory.env());
+    process.setEnv(var, val);
+    process.start();
+    //in-thread wait
+    process.run();
+
+    //here stopped
+    assertTrue("process stop callback not received", stopped);
+    assertEquals(0, process.getExitCode().intValue());
+    assertStringInOutput(val, getFinalOutput());
+  }
+
+  /**
+   * Get the final output. includes a quick sleep for the tail output
+   * @return the last output
+   * @throws InterruptedException
+   */
+  private List<String> getFinalOutput() throws InterruptedException {
+    return process.getRecentOutput();
+  }
+
+  public void assertStringInOutput(String text, List<String> output) {
+    boolean found = false;
+    StringBuilder builder = new StringBuilder();
+    for (String s : output) {
+      builder.append(s).append('\n');
+      if (s.contains(text)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      String message =
+          "Text \"" + text + "\" not found in " + output.size() + " lines\n";
+      fail(message + builder.toString());
+    }
+
+  }
+
+
+  private LongLivedProcess initProcess(List<String> commands) {
+    process = new LongLivedProcess(name.getMethodName(), log, commands);
+    process.setLifecycleCallback(this);
+    return process;
+  }
+
+  /**
+   * Handler for callback events on the process
+   */
+
+
+  @Override
+  public void onProcessStarted(LongLivedProcess process) {
+    started = true;
+  }
+
+  /**
+   * Handler for callback events on the process
+   */
+  @Override
+  public void onProcessExited(LongLivedProcess process, int exitCode) {
+    this.exitCode = exitCode;
+    stopped = true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/0f1ab84a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index d485148..ab57644 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.service.Service;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
+import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,13 +32,15 @@ public abstract class WorkflowServiceTestBase extends Assert {
   private static final Logger
       log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
 
-
   /**
    * Set the timeout for every test
    */
   @Rule
   public Timeout testTimeout = new Timeout(15000);
 
+  @Rule
+  public TestName name = new TestName();
+  
   @Before
   public void nameThread() {
     Thread.currentThread().setName("JUnit");


[7/8] git commit: Merge branch 'feature/slider-94-workflow-services' into develop

Posted by st...@apache.org.
Merge branch 'feature/slider-94-workflow-services' into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/dd48a8a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/dd48a8a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/dd48a8a3

Branch: refs/heads/develop
Commit: dd48a8a327f1ad87b84a810eaf770ffed8544526
Parents: 5f3b8a6 649a3b7
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 19:11:23 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 19:11:23 2014 +0100

----------------------------------------------------------------------
 .../server/appmaster/RoleLaunchService.java     |   4 +-
 .../server/appmaster/SliderAppMaster.java       |  12 +-
 .../LaunchedWorkflowCompositeService.java       |   2 +-
 .../server/services/utility/RpcService.java     |  65 --------
 .../AbstractWorkflowExecutorService.java        | 110 +++++++++++++
 .../services/workflow/ClosingService.java       |  51 ++++--
 .../services/workflow/ForkedProcessService.java |  98 +++++++++---
 .../services/workflow/LongLivedProcess.java     | 112 +++++++++----
 .../LongLivedProcessLifecycleEvent.java         |   5 +-
 .../workflow/ServiceTerminatingRunnable.java    |  18 ++-
 .../services/workflow/ServiceThreadFactory.java |  10 +-
 .../workflow/WorkflowCompositeService.java      |   2 +-
 .../workflow/WorkflowEventCallback.java         |   4 +-
 .../workflow/WorkflowEventNotifyingService.java |   7 +-
 .../workflow/WorkflowExecutorService.java       |  73 ---------
 .../services/workflow/WorkflowRpcService.java   |  76 +++++++++
 .../workflow/WorkflowSequenceService.java       |   2 +-
 .../server/services/workflow/package-info.java  | 108 ++++++++-----
 .../services/workflow/EndOfServiceWaiter.java   |  56 +++++++
 .../workflow/ParentWorkflowTestBase.java        |  70 ++++++++
 .../workflow/ProcessCommandFactory.java         |  87 ++++++++++
 .../services/workflow/SimpleRunnable.java       |  46 ++++++
 .../workflow/TestForkedProcessService.java      | 140 ++++++++++++++++
 .../services/workflow/TestLongLivedProcess.java | 160 +++++++++++++++++++
 .../TestServiceTerminatingRunnable.java         |  64 ++++++++
 .../workflow/TestWorkflowClosingService.java    | 116 ++++++++++++++
 .../workflow/TestWorkflowCompositeService.java  |   2 +-
 .../workflow/TestWorkflowExecutorService.java   |  61 +++++++
 .../workflow/TestWorkflowRpcService.java        | 107 +++++++++++++
 .../workflow/TestWorkflowSequenceService.java   |   4 +-
 .../workflow/WorkflowServiceTestBase.java       |  75 +++++----
 .../accumulo/AccumuloProviderService.java       |   8 +-
 32 files changed, 1457 insertions(+), 298 deletions(-)
----------------------------------------------------------------------



[6/8] git commit: move RPCService in as WorkflowRPCService, add tests, review and update docs ready for copy to hadoop trunk

Posted by st...@apache.org.
move RPCService in as WorkflowRPCService, add tests, review and update docs ready for copy to hadoop trunk


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/649a3b74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/649a3b74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/649a3b74

Branch: refs/heads/develop
Commit: 649a3b74646699e421acdc7793d3f79da2d28186
Parents: 3770370
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 19:10:48 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 19:10:48 2014 +0100

----------------------------------------------------------------------
 .../server/appmaster/SliderAppMaster.java       |   6 +-
 .../LaunchedWorkflowCompositeService.java       |   2 +-
 .../server/services/utility/RpcService.java     |  65 -----------
 .../AbstractWorkflowExecutorService.java        |  34 ++++--
 .../services/workflow/ClosingService.java       |  30 ++++-
 .../services/workflow/ForkedProcessService.java |  28 +++++
 .../services/workflow/LongLivedProcess.java     |   7 +-
 .../LongLivedProcessLifecycleEvent.java         |   2 +-
 .../workflow/ServiceTerminatingRunnable.java    |  14 +++
 .../services/workflow/WorkflowRpcService.java   |  76 ++++++++++++
 .../server/services/workflow/package-info.java  | 108 +++++++++++------
 .../workflow/ParentWorkflowTestBase.java        |   4 +
 .../services/workflow/TestCloseableService.java | 116 -------------------
 .../workflow/TestWorkflowClosingService.java    | 116 +++++++++++++++++++
 .../workflow/TestWorkflowRpcService.java        | 107 +++++++++++++++++
 .../workflow/WorkflowServiceTestBase.java       |  10 +-
 16 files changed, 484 insertions(+), 241 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index f245d56..2457b4d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -113,7 +113,7 @@ import org.apache.slider.server.appmaster.web.rest.RestPaths;
 import org.apache.slider.server.services.registry.SliderRegistryService;
 import org.apache.slider.server.services.utility.AbstractSliderLaunchedService;
 import org.apache.slider.server.services.workflow.WorkflowEventCallback;
-import org.apache.slider.server.services.utility.RpcService;
+import org.apache.slider.server.services.workflow.WorkflowRpcService;
 import org.apache.slider.server.services.utility.WebAppService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -191,7 +191,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
    */
   private ByteBuffer allTokens;
 
-  private RpcService rpcService;
+  private WorkflowRpcService rpcService;
 
   /**
    * Secret manager
@@ -911,7 +911,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
                                                     .newReflectiveBlockingService(
                                                       protobufRelay);
 
-    rpcService = new RpcService(RpcBinder.createProtobufServer(
+    rpcService = new WorkflowRpcService("SliderRPC", RpcBinder.createProtobufServer(
       new InetSocketAddress("0.0.0.0", 0),
       getConfig(),
       secretManager,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
index b5d11e7..0d47c3b 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/utility/LaunchedWorkflowCompositeService.java
@@ -91,7 +91,7 @@ public class LaunchedWorkflowCompositeService extends WorkflowCompositeService
 
   @Override
   public synchronized void addService(Service service) {
-    Preconditions.checkNotNull(service, "null service");
+    Preconditions.checkArgument(service != null, "null service argument");
     super.addService(service);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java b/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java
deleted file mode 100644
index 72412d4..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/utility/RpcService.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.utility;
-
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.service.AbstractService;
-
-import java.net.InetSocketAddress;
-
-/**
- * A YARN service that maps the start/stop lifecycle of an RPC server
- * to the YARN service lifecycle
- */
-public class RpcService extends AbstractService {
-
-  /** RPC server*/
-  private final Server server;
-
-  /**
-   * Construct an instance
-   * @param server server to manger
-   */
-  public RpcService(Server server) {
-    super("RpcService");
-    this.server = server;
-  }
-
-  public Server getServer() {
-    return server;
-  }
-
-  public InetSocketAddress getConnectAddress() {
-    return NetUtils.getConnectAddress(server);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-    server.start();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (server != null) {
-      server.stop();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
index 17d3b50..c26e3c4 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
@@ -25,27 +25,47 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 /**
- * A service that hosts an executor -in shutdown it is stopped.
+ * A service that hosts an executor -when the service is stopped,
+ * {@link ExecutorService#shutdownNow()} is invoked.
  */
 public abstract class AbstractWorkflowExecutorService extends AbstractService {
 
   private ExecutorService executor;
-  
+
+  /**
+   * Construct an instance with the given name -but
+   * no executor
+   * @param name service name
+   */
   public AbstractWorkflowExecutorService(String name) {
     this(name, null);
   }
 
+  /**
+   * Construct an instance with the given name and executor
+   * @param name service name
+   * @param executor exectuor
+   */
   protected AbstractWorkflowExecutorService(String name,
       ExecutorService executor) {
     super(name);
     this.executor = executor;
   }
 
-  public ExecutorService getExecutor() {
+  /**
+   * Get the executor
+   * @return the executor
+   */
+  public synchronized ExecutorService getExecutor() {
     return executor;
   }
 
-  protected void setExecutor(ExecutorService executor) {
+  /**
+   * Set the executor. This is protected as it
+   * is intended to be restricted to subclasses
+   * @param executor executor
+   */
+  protected synchronized void setExecutor(ExecutorService executor) {
     this.executor = executor;
   }
 
@@ -55,7 +75,7 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
    * @param runnable runnable to execute
    */
   public void execute(Runnable runnable) {
-    executor.execute(runnable);
+    getExecutor().execute(runnable);
   }
 
   /**
@@ -65,7 +85,7 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
    * @return a future to wait on
    */
   public <V> Future<V> submit(Callable<V> callable) {
-    return executor.submit(callable);
+    return getExecutor().submit(callable);
   }
   /**
    * Stop the service: halt the executor. 
@@ -82,7 +102,7 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
    * This uses {@link ExecutorService#shutdownNow()}
    * and so does not block until they have completed.
    */
-  protected void stopExecutor() {
+  protected synchronized void stopExecutor() {
     if (executor != null) {
       executor.shutdownNow();
     }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
index 8468a98..7a475cc 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -31,25 +31,42 @@ import java.io.IOException;
  */
 public class ClosingService<C extends Closeable> extends AbstractService {
 
-  private volatile C closeable;
+  private C closeable;
 
 
+  /**
+   * Construct an instance of the service
+   * @param name service name
+   * @param closeable closeable to close (may be null)
+   */
   public ClosingService(String name,
       C closeable) {
     super(name);
     this.closeable = closeable;
   }
 
+  /**
+   * Construct an instance of the service, using the default name
+   * @param closeable closeable to close (may be null)
+   */
   public ClosingService(C closeable) {
     this("ClosingService", closeable);
   }
 
 
-  public C getCloseable() {
+  /**
+   * Get the closeable
+   * @return the closeable
+   */
+  public synchronized C getCloseable() {
     return closeable;
   }
 
-  public void setCloseable(C closeable) {
+  /**
+   * Set or update the closeable.
+   * @param closeable
+   */
+  public synchronized void setCloseable(C closeable) {
     this.closeable = closeable;
   }
 
@@ -61,13 +78,14 @@ public class ClosingService<C extends Closeable> extends AbstractService {
    */
   @Override
   protected void serviceStop() {
-    if (closeable != null) {
+    C target = getCloseable();
+    if (target != null) {
       try {
-        closeable.close();
+        target.close();
       } catch (IOException ioe) {
         noteFailure(ioe);
       }
-      closeable = null;
+      setCloseable(null);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index b5459da..141ab7d 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -35,7 +35,35 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This service is notified when the subprocess terminates, and stops itself 
  * and converts a non-zero exit code into a failure exception.
  * 
+ * <p>
+ * Key Features:
+ * <ol>
+ *   <li>The property {@link #executionTimeout} can be set to set a limit
+ *   on the duration of a process</li>
+ *   <li>Output is streamed to the output logger provided</li>.
+ *   <li>The most recent lines of output are saved to a linked list</li>.
+ *   <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start
+ *   and finish of a process.</li>
+ * </ol>
+ *
+ * Usage:
+ * <p></p>
+ * The service can be built in the constructor, {@link #ForkedProcessService(String, Map, List)},
+ * or have its simple constructor used to instantiate the service, then the 
+ * {@link #build(Map, List)} command used to define the environment variables
+ * and list of commands to execute. One of these two options MUST be exercised
+ * before calling the services's {@link #start()} method.
+ * <p></p>
+ * The forked process is executed in the service's {@link #serviceStart()} method;
+ * if still running when the service is stopped, {@link #serviceStop()} will
+ * attempt to stop it.
+ * <p></p>
  * 
+ * The service delegates process execution to {@link LongLivedProcess},
+ * receiving callbacks via the {@link LongLivedProcessLifecycleEvent}.
+ * When the service receives a callback notifying that the process has completed,
+ * it calls its {@link #stop()} method. If the error code was non-zero, 
+ * the service is logged as having failed.
  */
 public class ForkedProcessService extends AbstractWorkflowExecutorService implements
     LongLivedProcessLifecycleEvent, Runnable {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index fe895e9..a1db64f 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -88,16 +88,17 @@ public class LongLivedProcess implements Runnable {
    * Log supplied in the constructor for the spawned process -accessible
    * to inner classes
    */
-  final Logger processLog;
+  private final Logger processLog;
+  
   /**
    * Class log -accessible to inner classes
    */
-  static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
+  private static final Logger LOG = LoggerFactory.getLogger(LongLivedProcess.class);
 
   /**
    * Volatile flag to indicate that the process is done
    */
-  volatile boolean finished;
+  private volatile boolean finished;
 
   public LongLivedProcess(String name,
       Logger processLog,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
index 86d20ff..a13b508 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -33,7 +33,7 @@ public interface LongLivedProcessLifecycleEvent {
    * Callback when a process has finished
    * @param process the process invoking the callback
    * @param exitCode exit code from the process
-   * @param signCorrectedCode
+   * @param signCorrectedCode the code- as sign corrected
    */
   void onProcessExited(LongLivedProcess process,
       int exitCode,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
index 8549971..a69c1fc 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -30,6 +30,11 @@ public class ServiceTerminatingRunnable implements Runnable {
   private final Runnable action;
   private Exception exception;
 
+  /**
+   * Create an instance
+   * @param owner owning service
+   * @param action action to execute before terminating the service
+   */
   public ServiceTerminatingRunnable(Service owner, Runnable action) {
     Preconditions.checkArgument(owner != null, "null owner");
     Preconditions.checkArgument(action != null, "null action");
@@ -37,10 +42,19 @@ public class ServiceTerminatingRunnable implements Runnable {
     this.action = action;
   }
 
+  /**
+   * Get the owning service
+   * @return the service to receive notification when
+   * the runnable completes.
+   */
   public Service getOwner() {
     return owner;
   }
 
+  /**
+   * Any exception raised by inner <code>action's</code> run.
+   * @return an exception or null.
+   */
   public Exception getException() {
     return exception;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
new file mode 100644
index 0000000..b71530f
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowRpcService.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.AbstractService;
+
+import java.net.InetSocketAddress;
+
+/**
+ * A YARN service that maps the start/stop lifecycle of an RPC server
+ * to the YARN service lifecycle. 
+ */
+public class WorkflowRpcService extends AbstractService {
+
+  /** RPC server*/
+  private final Server server;
+
+  /**
+   * Construct an instance
+   * @param name service name
+   * @param server service to stop
+   */
+  public WorkflowRpcService(String name, Server server) {
+    super(name);
+    Preconditions.checkArgument(server != null, "Null server");
+    this.server = server;
+  }
+
+  /**
+   * Get the server
+   * @return the server
+   */
+  public Server getServer() {
+    return server;
+  }
+
+  /**
+   * Get the socket address of this server
+   * @return the address this server is listening on
+   */
+  public InetSocketAddress getConnectAddress() {
+    return NetUtils.getConnectAddress(server);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    server.start();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
index b6492fe..4dd2cc7 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -26,14 +26,15 @@ package org.apache.slider.server.services.workflow;
  This package contains classes which can be aggregated to build up
  complex workflows of services: sequences of operations, callbacks
  and composite services with a shared lifespan.
- 
+
  Core concepts:
  <ol>
  <li>
  Workflow service instances have a limited lifespan, and will self-terminate when
  they consider it time</li>
  <li>
- Workflow Services that have children implement the {@link org.apache.slider.server.services.workflow.ServiceParent}
+ Workflow Services that have children implement the
+ {@link org.apache.slider.server.services.workflow.ServiceParent}
  class, which provides (thread-safe) access to the children -allowing new children
  to be added, and existing children to be ennumerated
  </li>
@@ -44,77 +45,108 @@ package org.apache.slider.server.services.workflow;
  </li>
  <li>
  Workflow Services may be subclassed to extend their behavior, or to use them
- in specific applications. Just as the standard {@link org.apache.hadoop.service.CompositeService}
- is often subclassed to aggregate child services, the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ in specific applications. Just as the standard
+ {@link org.apache.hadoop.service.CompositeService}
+ is often subclassed to aggregate child services, the
+ {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
  can be used instead -adding the feature that failing services trigger automatic
  parent shutdown. If that is the desired operational mode of a class,
  swapping the composite service implementation may be sufficient to adopt it.
  </li>
  </ol>
- 
+
  <h2>
- How do the workflow services differ from the standard <code>CompositeService</code>?
+ How do the workflow services differ from the standard YARN services?
  </h2>
+
+ <p>
  
+ There is exactly one standard YARN service for managing children, the
+ {@link org.apache.hadoop.service.CompositeService}.
+ </p><p>
  The {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
  shares the same model of "child services, all inited and started together".
  Where it differs is that if any child service stops -either due to a failure
- or to an action which invokes that service's <code>stop()</code> method.
- 
+ or to an action which invokes that service's
+ {@link org.apache.hadoop.service.Service#stop()} method.
+ </p><p>
+
  In contrast, the original <code>CompositeService</code> class starts its children
- in its <code>start()</code> method, but does not listen or react to any
+ in its{@link org.apache.hadoop.service.Service#start()}  method, but does not listen or react to any
  child service halting. As a result, changes in child state are not detected
  or propagated.
- 
+  </p><p>
+
  If a child service runs until completed -that is it will not be stopped until
  instructed to do so, and if it is only the parent service that attempts to
  stop the child, then this difference is unimportant. 
- 
+  </p><p>
+
  However, if any service that depends upon all it child services running -
  and if those child services are written so as to stop when they fail, using
  the <code>WorkflowCompositeService</code> as a base class will enable the 
  parent service to be automatically notified of a child stopping.
- 
+
+ </p><p>
  The {@link org.apache.slider.server.services.workflow.WorkflowSequenceService}
  resembles the composite service in API, but its workflow is different. It
  initializes and starts its children one-by-one, only starting the second after
  the first one succeeds, the third after the second, etc. If any service in
  the sequence fails, the parent <code>WorkflowSequenceService</code> stops, 
  reporting the same exception. 
- 
- 
- <h2>
- Other Workflow Services
- </h2>
+ </p>
+ <p>
+ The {@link org.apache.slider.server.services.workflow.ForkedProcessService}:
+ Executes a process when started, and binds to the life of that process. When the
+ process terminates, so does the service -and vice versa. This service enables
+ external processes to be executed as part of a sequence of operations -or,
+ using the {@link org.apache.slider.server.services.workflow.WorkflowCompositeService}
+ in parallel with other services, terminating the process when the other services
+ stop -and vice versa.
+ </p>
+
+
+<h2>
+Other Workflow Services
+</h2>
+
+ There are some minor services that have proven useful within aggregate workflows,
+ and simply in applications which are built from composite YARN services.
  
  <ul>
+ <li>{@link org.apache.slider.server.services.workflow.WorkflowRpcService }:
+ Maintains a reference to an RPC {@link org.apache.hadoop.ipc.Server} instance.
+ When the service is started, so is the RPC server. Similarly, when the service
+ is stopped, so is the RPC server instance. 
+ </li>
  <li>{@link org.apache.slider.server.services.workflow.WorkflowEventNotifyingService }:
- Notifies callbacks when a workflow reaches a specific point (potentially after a delay).</li>
- <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
- Executes a process when started, and binds to the life of that process. When the
- process terminates, so does the service -and vice versa.</li>
+ Notifies callbacks when a workflow reaches a specific point (potentially after a delay).
+ </li>
  <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
- an instance of <code>Closeable</code> when the service is stopped. This
- is purely a housekeeping class.</></li>
- <li>{@link }: </li>
+ an instance of {@link java.io.Closeable} when the service is stopped. This
+ is purely a housekeeping class.
+ </li>
+
  </ul>
 
-Lower-level classes 
+ Lower-level classes 
  <ul>
- <li>{@link org.apache.slider.server.services.workflow.AbstractWorkflowExecutorService }:
- This is a base class for YARN services that use an {@link java.util.concurrent.ExecutorService}.
- for managing asynchronous operations: it stops the executor when the service is
- stopped.
+ <li>{@link org.apache.slider.server.services.workflow.ServiceTerminatingRunnable }:
+ A {@link java.lang.Runnable} which runs the runnable supplied in its constructor
+ then signals its owning service to stop once that runnable is completed. 
+ Any exception raised in the run is stored.
  </li>
- <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
- Executes a process when started, and binds to the life of that process. When the
- process terminates, so does the service -and vice versa.</li>
- <li>{@link org.apache.slider.server.services.workflow.LongLivedProcess}:
- The inner class used to managed the forked process. When called directly it
- offers more features.</li>
- <li>{@link org.apache.slider.server.services.workflow.ClosingService}:
- A parameterized service to close the <code>Closeable</code> passed in -used for cleaning
- up references.</li>
+ <li>{@link org.apache.slider.server.services.workflow.AbstractWorkflowExecutorService}:
+ A base class for services that wish to have a {@link java.util.concurrent.ExecutorService}
+ with a lifespan mapped to that of a service. When the service is stopped, the
+ {@link java.util.concurrent.ExecutorService#shutdownNow()} method is called to
+ attempt to shut down all running tasks.
+ </li>
+ <li>{@link org.apache.slider.server.services.workflow.ServiceThreadFactory}:
+ This is a simple {@link java.util.concurrent.ThreadFactory} which generates
+ meaningful thread names. It can be used as a parameter to constructors of 
+ {@link java.util.concurrent.ExecutorService} instances, to ensure that
+ log information can tie back text to the related services</li>
  </ul>
 
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
index 000705f..a11a1cf 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
@@ -20,6 +20,10 @@ package org.apache.slider.server.services.workflow;
 
 import org.apache.hadoop.service.Service;
 
+/**
+ * Extends {@link WorkflowServiceTestBase} with parent-specific operations
+ * and logic to build up and run the parent service
+ */
 public abstract class ParentWorkflowTestBase extends WorkflowServiceTestBase {
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
deleted file mode 100644
index 3623687..0000000
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class TestCloseableService extends WorkflowServiceTestBase {
-
-  @Test
-  public void testSimpleClose() throws Throwable {
-    ClosingService<OpenClose> svc = instance(false);
-    OpenClose openClose = svc.getCloseable();
-    assertFalse(openClose.closed);
-    svc.stop();
-    assertTrue(openClose.closed);
-  }
-
-  @Test
-  public void testNullClose() throws Throwable {
-    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(null);
-    svc.init(new Configuration());
-    svc.start();
-    assertNull(svc.getCloseable());
-    svc.stop();
-  }
-
-  @Test
-  public void testFailingClose() throws Throwable {
-    ClosingService<OpenClose> svc = instance(false);
-    OpenClose openClose = svc.getCloseable();
-    openClose.raiseExceptionOnClose = true;
-    svc.stop();
-    assertTrue(openClose.closed);
-    Throwable cause = svc.getFailureCause();
-    assertNotNull(cause);
-
-    //retry should be a no-op
-    svc.close();
-  }
-
-  @Test
-  public void testDoubleClose() throws Throwable {
-    ClosingService<OpenClose> svc = instance(false);
-    OpenClose openClose = svc.getCloseable();
-    openClose.raiseExceptionOnClose = true;
-    svc.stop();
-    assertTrue(openClose.closed);
-    Throwable cause = svc.getFailureCause();
-    assertNotNull(cause);
-    openClose.closed = false;
-    svc.stop();
-    assertEquals(cause, svc.getFailureCause());
-  }
-
-  /**
-   * This does not recurse forever, as the service has already entered the
-   * STOPPED state before the inner close tries to stop it -that operation
-   * is a no-op
-   * @throws Throwable
-   */
-  @Test
-  public void testCloseSelf() throws Throwable {
-    ClosingService<ClosingService> svc =
-        new ClosingService<ClosingService>(null);
-    svc.setCloseable(svc);
-    svc.stop();
-  }
-
-
-  private ClosingService<OpenClose> instance(boolean raiseExceptionOnClose) {
-    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(new OpenClose(
-        raiseExceptionOnClose));
-    svc.init(new Configuration());
-    svc.start();
-    return svc;
-  }
-
-  private static class OpenClose implements Closeable {
-    public boolean closed = false;
-    public boolean raiseExceptionOnClose;
-
-    private OpenClose(boolean raiseExceptionOnClose) {
-      this.raiseExceptionOnClose = raiseExceptionOnClose;
-    }
-
-    @Override
-    public void close() throws IOException {
-      if (!closed) {
-        closed = true;
-        if (raiseExceptionOnClose) {
-          throw new IOException("OpenClose");
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java
new file mode 100644
index 0000000..638547f
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowClosingService.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class TestWorkflowClosingService extends WorkflowServiceTestBase {
+
+  @Test
+  public void testSimpleClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    assertFalse(openClose.closed);
+    svc.stop();
+    assertTrue(openClose.closed);
+  }
+
+  @Test
+  public void testNullClose() throws Throwable {
+    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(null);
+    svc.init(new Configuration());
+    svc.start();
+    assertNull(svc.getCloseable());
+    svc.stop();
+  }
+
+  @Test
+  public void testFailingClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    openClose.raiseExceptionOnClose = true;
+    svc.stop();
+    assertTrue(openClose.closed);
+    Throwable cause = svc.getFailureCause();
+    assertNotNull(cause);
+
+    //retry should be a no-op
+    svc.close();
+  }
+
+  @Test
+  public void testDoubleClose() throws Throwable {
+    ClosingService<OpenClose> svc = instance(false);
+    OpenClose openClose = svc.getCloseable();
+    openClose.raiseExceptionOnClose = true;
+    svc.stop();
+    assertTrue(openClose.closed);
+    Throwable cause = svc.getFailureCause();
+    assertNotNull(cause);
+    openClose.closed = false;
+    svc.stop();
+    assertEquals(cause, svc.getFailureCause());
+  }
+
+  /**
+   * This does not recurse forever, as the service has already entered the
+   * STOPPED state before the inner close tries to stop it -that operation
+   * is a no-op
+   * @throws Throwable
+   */
+  @Test
+  public void testCloseSelf() throws Throwable {
+    ClosingService<ClosingService> svc =
+        new ClosingService<ClosingService>(null);
+    svc.setCloseable(svc);
+    svc.stop();
+  }
+
+
+  private ClosingService<OpenClose> instance(boolean raiseExceptionOnClose) {
+    ClosingService<OpenClose> svc = new ClosingService<OpenClose>(new OpenClose(
+        raiseExceptionOnClose));
+    svc.init(new Configuration());
+    svc.start();
+    return svc;
+  }
+
+  private static class OpenClose implements Closeable {
+    public boolean closed = false;
+    public boolean raiseExceptionOnClose;
+
+    private OpenClose(boolean raiseExceptionOnClose) {
+      this.raiseExceptionOnClose = raiseExceptionOnClose;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (!closed) {
+        closed = true;
+        if (raiseExceptionOnClose) {
+          throw new IOException("OpenClose");
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java
new file mode 100644
index 0000000..c7910ff
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowRpcService.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+public class TestWorkflowRpcService extends WorkflowServiceTestBase {
+
+  @Test
+  public void testCreateMockRPCService() throws Throwable {
+    MockRPC rpc = new MockRPC();
+    rpc.start();
+    assertTrue(rpc.started);
+    rpc.getListenerAddress();
+    rpc.stop();
+    assertTrue(rpc.stopped);
+  }
+
+  @Test
+  public void testLifecycle() throws Throwable {
+    MockRPC rpc = new MockRPC();
+    WorkflowRpcService svc = new WorkflowRpcService("test", rpc);
+    run(svc);
+    assertTrue(rpc.started);
+    svc.getConnectAddress();
+    svc.stop();
+    assertTrue(rpc.stopped);
+  }
+  
+  @Test
+  public void testStartFailure() throws Throwable {
+    MockRPC rpc = new MockRPC();
+    rpc.failOnStart = true;
+    WorkflowRpcService svc = new WorkflowRpcService("test", rpc);
+    svc.init(new Configuration());
+    try {
+      svc.start();
+      fail("expected an exception");
+    } catch (RuntimeException e) {
+      assertEquals("failOnStart", e.getMessage());
+    }
+    svc.stop();
+    assertTrue(rpc.stopped);
+  }
+  
+  private static class MockRPC extends Server {
+
+    public boolean stopped;
+    public boolean started;
+    public boolean failOnStart;
+
+    private MockRPC() throws IOException {
+      super("localhost", 0, null, 1, new Configuration());
+    }
+
+    @Override
+    public synchronized void start() {
+      if (failOnStart) {
+        throw new RuntimeException("failOnStart");
+      }
+      started = true;
+      super.start();
+    }
+
+    @Override
+    public synchronized void stop() {
+      stopped = true;
+      super.stop();
+    }
+
+    @Override
+    public synchronized InetSocketAddress getListenerAddress() {
+      return super.getListenerAddress();
+    }
+
+    @Override
+    public Writable call(RPC.RpcKind rpcKind,
+        String protocol,
+        Writable param,
+        long receiveTime) throws Exception {
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/649a3b74/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index 6c0cdc4..95331b1 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -30,6 +30,9 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 
+/**
+ * Test base for workflow service tests.
+ */
 public abstract class WorkflowServiceTestBase extends Assert {
   private static final Logger
       log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
@@ -77,7 +80,6 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
-
   /**
    * Init and start a service
    * @param svc the service
@@ -106,6 +108,12 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
+  /**
+   * Assert that a string is in an output list. Fails fast if the output
+   * list is empty
+   * @param text text to scan for
+   * @param output list of output lines.
+   */
   public void assertStringInOutput(String text, List<String> output) {
     assertTrue("Empty output list", !output.isEmpty());
     boolean found = false;


[4/8] git commit: SLIDER-94 ... + tests for ForkedProcessService, change reporting order to avoid service termination before output streams of child process flushed


Posted by st...@apache.org.
SLIDER-94 ... + tests for ForkedProcessService, change reporting order to avoid service termination before output streams of child process flushed



Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4fb479d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4fb479d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4fb479d5

Branch: refs/heads/develop
Commit: 4fb479d5f0e5aa0f0f99b4d0c860eb0b604494f6
Parents: 0f1ab84
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 17:26:23 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 17:26:23 2014 +0100

----------------------------------------------------------------------
 .../AbstractWorkflowExecutorService.java        |  11 ++
 .../services/workflow/ClosingService.java       |   4 +-
 .../services/workflow/ForkedProcessService.java |  60 +++++---
 .../services/workflow/LongLivedProcess.java     |  78 ++++++++---
 .../LongLivedProcessLifecycleEvent.java         |   5 +-
 .../workflow/ServiceTerminatingRunnable.java    |   4 +-
 .../services/workflow/ServiceThreadFactory.java |   4 +-
 .../workflow/WorkflowEventNotifyingService.java |   2 +-
 .../services/workflow/EndOfServiceWaiter.java   |  56 ++++++++
 .../workflow/ProcessCommandFactory.java         |  12 ++
 .../workflow/TestForkedProcessService.java      | 136 +++++++++++++++++++
 .../services/workflow/TestLongLivedProcess.java |  47 ++++---
 .../workflow/TestWorkflowExecutorService.java   |   2 +-
 .../workflow/WorkflowServiceTestBase.java       |  23 +++-
 14 files changed, 373 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
index 29d04e1..17d3b50 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
@@ -20,7 +20,9 @@ package org.apache.slider.server.services.workflow;
 
 import org.apache.hadoop.service.AbstractService;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * A service that hosts an executor -in shutdown it is stopped.
@@ -57,6 +59,15 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
   }
 
   /**
+   * Submit a callable
+   * @param callable callable
+   * @param <V> type of the final get
+   * @return a future to wait on
+   */
+  public <V> Future<V> submit(Callable<V> callable) {
+    return executor.submit(callable);
+  }
+  /**
    * Stop the service: halt the executor. 
    * @throws Exception exception.
    */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
index 6751347..8468a98 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -18,7 +18,6 @@
 
 package org.apache.slider.server.services.workflow;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.service.AbstractService;
 
 import java.io.Closeable;
@@ -26,6 +25,9 @@ import java.io.IOException;
 
 /**
  * Service that closes the closeable supplied during shutdown, if not null.
+ * 
+ * As the Service interface itself extends Closeable, this service
+ * can be used to shut down other services if desired.
  */
 public class ClosingService<C extends Closeable> extends AbstractService {
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index a5c042a..3e9ce93 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -47,8 +47,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
   private final AtomicBoolean processTerminated = new AtomicBoolean(false);
   private boolean processStarted = false;
   private LongLivedProcess process;
-  private Map<String, String> environment;
-  private List<String> commands;
   private int executionTimeout = -1;
   private int timeoutCode = 1;
 
@@ -65,6 +63,19 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
     super(name);
   }
 
+  /**
+   * Create an instance of the service,  set up the process
+   * @param name a name
+   * @param commandList list of commands is inserted on the front
+   * @param env environment variables above those generated by
+   * @throws IOException IO problems
+   */
+  public ForkedProcessService(String name, Map<String, String> env,
+      List<String> commandList) throws IOException {
+    super(name);
+    build(env, commandList);
+  }
+
   @Override //AbstractService
   protected void serviceStart() throws Exception {
     if (process == null) {
@@ -105,42 +116,41 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
                     List<String> commandList)
       throws IOException {
     assert process == null;
-    this.commands = commandList;
-    this.environment = env;
     process = new LongLivedProcess(getName(), LOG, commandList);
     process.setLifecycleCallback(this);
     //set the env variable mapping
     process.putEnvMap(env);
   }
 
-  @Override // ApplicationEventHandler
+  @Override // notification from executed process
   public synchronized void onProcessStarted(LongLivedProcess process) {
     LOG.debug("Process has started");
     processStarted = true;
     if (executionTimeout > 0) {
-      setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+      setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
       execute(this);
     }
   }
 
-  @Override // ApplicationEventHandler
-  public void onProcessExited(LongLivedProcess process, int code) {
-    synchronized (this) {
-      completed(code);
-      //note whether or not the service had already stopped
-      LOG.debug("Process has exited with exit code {}", code);
-      if (code != 0) {
-        reportFailure(code, getName() + " failed with code " + code);
+  @Override  // notification from executed process
+  public void onProcessExited(LongLivedProcess process,
+      int uncorrected,
+      int code) {
+    try {
+      synchronized (this) {
+        completed(code);
+        //note whether or not the service had already stopped
+        LOG.debug("Process has exited with exit code {}", code);
+        if (code != 0) {
+          reportFailure(code, getName() + " failed with code " + code);
+        }
       }
-    }
-    //now stop itself
-    if (!isInState(STATE.STOPPED)) {
+    } finally {
       stop();
     }
   }
 
   private void reportFailure(int code, String text) {
-    this.exitCode.set(code);
     //error
     ServiceLaunchException execEx = new ServiceLaunchException(code, text);
     LOG.debug("Noting failure", execEx);
@@ -184,7 +194,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
    * @param code exit code
    */
   protected void completed(int code) {
-    exitCode.set(code);
     processTerminated.set(true);
     synchronized (processTerminated) {
       processTerminated.notify();
@@ -199,9 +208,20 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
     return processStarted;
   }
 
+  /**
+   * Is a process running: between started and terminated
+   * @return true if the process is up.
+   */
+  public synchronized boolean isProcessRunning() {
+    return processStarted && !isProcessTerminated();
+  }
+
 
   public int getExitCode() {
-    return exitCode.get();
+    return process.getExitCode();
+  }
+  public int getExitCodeSignCorrected() {
+    return process.getExitCodeSignCorrected();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 9efbe9f..b46fcd0 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -42,14 +42,32 @@ import java.util.concurrent.TimeUnit;
  * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
  * a short lived application; this class allows for the process to run for the
  * life of the Java process that forked it.
+ * 
+ * Key Features
+ * <ol>
+ *   <li>Output is streamed to the output logger provided</li>.
+ *   <li>The most recent lines of output are saved to a linked list</li>.
+ *   <li>A callback, {@link LongLivedProcessLifecycleEvent}, is raised on</li>
+ * </ol>
+ * 
  */
 public class LongLivedProcess implements Runnable {
-  public static final int STREAM_READER_SLEEP_TIME = 200;
+  /**
+   * Limit on number of lines to retain in the "recent" line list:{@value}
+   */
   public static final int RECENT_LINE_LOG_LIMIT = 64;
-  public static final int LINE_LENGTH = 256;
+
+  /**
+   * Const defining the time in millis between polling for new text
+   */
+  private static final int STREAM_READER_SLEEP_TIME = 200;
+  
+  /**
+   * limit on the length of a stream before it triggers an automatic newline
+   */
+  private static final int LINE_LENGTH = 256;
   private final ProcessBuilder processBuilder;
   private Process process;
-  private Exception exception;
   private Integer exitCode = null;
   private final String name;
   private final ExecutorService processExecutor;
@@ -58,7 +76,7 @@ public class LongLivedProcess implements Runnable {
   private ProcessStreamReader processStreamReader;
   //list of recent lines, recorded for extraction into reports
   private final List<String> recentLines = new LinkedList<String>();
-  private final int recentLineLimit = RECENT_LINE_LOG_LIMIT;
+  private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
   private LongLivedProcessLifecycleEvent lifecycleCallback;
 
   
@@ -89,15 +107,15 @@ public class LongLivedProcess implements Runnable {
     processExecutor = Executors.newSingleThreadExecutor(factory);
     logExecutor=    Executors.newSingleThreadExecutor(factory);
     processBuilder = new ProcessBuilder(commands);
-    initBuilder();
-  }
-
-  private void initBuilder() {
     processBuilder.redirectErrorStream(false);
   }
 
-  public ProcessBuilder getProcessBuilder() {
-    return processBuilder;
+  /**
+   * Set the limit on recent lines to retain
+   * @param recentLineLimit size of rolling list of recent lines.
+   */
+  public void setRecentLineLimit(int recentLineLimit) {
+    this.recentLineLimit = recentLineLimit;
   }
 
   /**
@@ -151,13 +169,19 @@ public class LongLivedProcess implements Runnable {
   }
 
   /**
-   * Get any exception raised by the process
-   * @return an exception or null
+   * Get the process builder -this can be manipulated
+   * up to the start() operation. As there is no synchronization
+   * around it, it must only be used in the same thread setting up the commmand.
+   * @return the process builder
    */
-  public Exception getException() {
-    return exception;
+  public ProcessBuilder getProcessBuilder() {
+    return processBuilder;
   }
 
+  /**
+   * Get the command list
+   * @return the comands
+   */
   public List<String> getCommands() {
     return processBuilder.command();
   }
@@ -181,6 +205,22 @@ public class LongLivedProcess implements Runnable {
   public Integer getExitCode() {
     return exitCode;
   }
+  
+    /**
+   * Get the exit code sign corrected: null until the process has finished
+   * @return the exit code or null
+   */
+  public Integer getExitCodeSignCorrected() {
+    Integer result;
+    if (exitCode != null) {
+      result = (exitCode << 24) >> 24;
+    } else {
+      result = null;
+    }
+    return result;
+  }
+  
+  
 
   /**
    * Stop the process if it is running.
@@ -256,10 +296,6 @@ public class LongLivedProcess implements Runnable {
       //tell the logger it has to finish too
       finished = true;
 
-      //now call the callback if it is set
-      if (lifecycleCallback != null) {
-        lifecycleCallback.onProcessExited(this, exitCode);
-      }
       // shut down the threads
       logExecutor.shutdown();
       try {
@@ -267,6 +303,12 @@ public class LongLivedProcess implements Runnable {
       } catch (InterruptedException ignored) {
         //ignored
       }
+
+      //now call the callback if it is set
+      if (lifecycleCallback != null) {
+        lifecycleCallback.onProcessExited(this, exitCode,
+            getExitCodeSignCorrected());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
index af83ed0..86d20ff 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -33,6 +33,9 @@ public interface LongLivedProcessLifecycleEvent {
    * Callback when a process has finished
    * @param process the process invoking the callback
    * @param exitCode exit code from the process
+   * @param signCorrectedCode
    */
-  void onProcessExited(LongLivedProcess process, int exitCode);
+  void onProcessExited(LongLivedProcess process,
+      int exitCode,
+      int signCorrectedCode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
index f4e95b9..8549971 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -31,8 +31,8 @@ public class ServiceTerminatingRunnable implements Runnable {
   private Exception exception;
 
   public ServiceTerminatingRunnable(Service owner, Runnable action) {
-    Preconditions.checkArgument(owner!=null, "null owner");
-    Preconditions.checkArgument(action!=null, "null action");
+    Preconditions.checkArgument(owner != null, "null owner");
+    Preconditions.checkArgument(action != null, "null action");
     this.owner = owner;
     this.action = action;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
index 6518126..7d7110e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * The thread counter starts at 1, increments atomically, 
  * and is supplied as the second argument in the format string.
  * 
- * A static method, {@link #newSingleThreadExecutor(String, boolean)},
+ * A static method, {@link #singleThreadExecutor(String, boolean)},
  * exists to simplify the construction of an executor with a single well-named
  * threads. 
  * 
@@ -92,7 +92,7 @@ public class ServiceThreadFactory implements ThreadFactory {
    * @param daemons flag to indicate the threads should be marked as daemons
    * @return an executor
    */
-  public static ExecutorService newSingleThreadExecutor(String name,
+  public static ExecutorService singleThreadExecutor(String name,
       boolean daemons) {
     return Executors.newSingleThreadExecutor(
         new ServiceThreadFactory(name, daemons));

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
index cca279e..a86453a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -73,7 +73,7 @@ public class WorkflowEventNotifyingService extends
   @Override
   protected void serviceStart() throws Exception {
     LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
-    setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+    setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
     execute(command);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
new file mode 100644
index 0000000..5e6df3b
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.junit.Assert;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Wait for a service to stop
+ */
+public class EndOfServiceWaiter implements ServiceStateChangeListener {
+
+  private final AtomicBoolean finished = new AtomicBoolean(false);
+
+  public EndOfServiceWaiter(Service svc) {
+    svc.registerServiceListener(this);
+  }
+
+  public synchronized void waitForServiceToStop(long timeout) throws
+      InterruptedException {
+    if (!finished.get()) {
+      wait(timeout);
+    }
+    Assert.assertTrue("Service did not finish in time period",
+        finished.get());
+  }
+
+  @Override
+  public synchronized void stateChanged(Service service) {
+    if (service.isInState(Service.STATE.STOPPED)) {
+      finished.set(true);
+      notify();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
index 8521d0d..45fdc86 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
@@ -55,6 +55,7 @@ public class ProcessCommandFactory {
     commands.add(text);
     return commands;
   }
+
   /**
    * print env variables
    * @return commands
@@ -66,6 +67,17 @@ public class ProcessCommandFactory {
   }
 
   /**
+   * execute a command that returns with an error code that will
+   * be converted into a number
+   * @return commands
+   */
+  public List<String> exitFalse() {
+    List<String> commands = new ArrayList<String>(2);
+    commands.add("false");
+    return commands;
+  }
+
+  /**
    * Create a process command factory for this OS
    * @return
    */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
new file mode 100644
index 0000000..a1d5450
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the long lived process by executing a command that works and a command
+ * that fails
+ */
+public class TestForkedProcessService extends WorkflowServiceTestBase {
+  private static final Logger
+      log = LoggerFactory.getLogger(TestForkedProcessService.class);
+
+  private static final Logger
+      processLog =
+      LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+
+
+  private ForkedProcessService process;
+  private File testDir = new File("target");
+  private ProcessCommandFactory commandFactory;
+  private Map<String, String> env = new HashMap<String, String>();
+
+  @Before
+  public void setupProcesses() {
+    commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+  }
+
+  @After
+  public void stopProcesses() {
+    ServiceOperations.stop(process);
+  }
+
+  @Test
+  public void testLs() throws Throwable {
+
+    initProcess(commandFactory.ls(testDir));
+    exec();
+    assertFalse(process.isProcessRunning());
+    assertEquals(0, process.getExitCode());
+
+    assertStringInOutput("test-classes", getFinalOutput());
+  }
+
+  @Test
+  public void testExitCodes() throws Throwable {
+
+    initProcess(commandFactory.exitFalse());
+    exec();
+    assertFalse(process.isProcessRunning());
+    int exitCode = process.getExitCode();
+    assertTrue(exitCode != 0);
+    int corrected = process.getExitCodeSignCorrected();
+    assertEquals(1, corrected);
+  }
+
+  @Test
+  public void testEcho() throws Throwable {
+
+    String echoText = "hello, world";
+    initProcess(commandFactory.echo(echoText));
+    exec();
+
+    assertEquals(0, process.getExitCode());
+    assertStringInOutput(echoText, getFinalOutput());
+
+  }
+
+  @Test
+  public void testSetenv() throws Throwable {
+
+    String var = "TEST_RUN";
+    String val = "TEST-RUN-ENV-VALUE";
+    env.put(var, val);
+    initProcess(commandFactory.env());
+    exec();
+
+    assertEquals(0, process.getExitCode());
+    assertStringInOutput(val, getFinalOutput());
+  }
+
+  /**
+   * Get the final output. includes a quick sleep for the tail output
+   * @return the last output
+   * @throws InterruptedException
+   */
+  private List<String> getFinalOutput() {
+    return process.getRecentOutput();
+  }
+
+  private ForkedProcessService initProcess(List<String> commands) throws
+      IOException {
+    process = new ForkedProcessService(name.getMethodName(), env,
+        commands);
+    process.init(new Configuration());
+
+    return process;
+  }
+
+  public void exec() throws InterruptedException {
+    assertNotNull(process);
+    EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
+    process.start();
+    waiter.waitForServiceToStop(5000);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
index ab37e6a..c9172e2 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
@@ -70,12 +70,31 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
     //here stopped
     assertTrue("process start callback not received", started);
     assertTrue("process stop callback not received", stopped);
+    assertFalse(process.isRunning());
     assertEquals(0, process.getExitCode().intValue());
 
     assertStringInOutput("test-classes", getFinalOutput());
   }
 
   @Test
+  public void testExitCodes() throws Throwable {
+
+    initProcess(commandFactory.exitFalse());
+    process.start();
+    //in-thread wait
+    process.run();
+
+    //here stopped
+
+    assertFalse(process.isRunning());
+    int exitCode = process.getExitCode();
+    assertTrue(exitCode != 0);
+    int corrected = process.getExitCodeSignCorrected();
+
+    assertEquals(1, corrected);
+  }
+
+  @Test
   public void testEcho() throws Throwable {
 
     String echoText = "hello, world";
@@ -95,7 +114,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
 
     String var = "TEST_RUN";
     String val = "TEST-RUN-ENV-VALUE";
-    String echoText = "${TEST_RUN}";
     initProcess(commandFactory.env());
     process.setEnv(var, val);
     process.start();
@@ -113,30 +131,10 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
    * @return the last output
    * @throws InterruptedException
    */
-  private List<String> getFinalOutput() throws InterruptedException {
+  private List<String> getFinalOutput() {
     return process.getRecentOutput();
   }
 
-  public void assertStringInOutput(String text, List<String> output) {
-    boolean found = false;
-    StringBuilder builder = new StringBuilder();
-    for (String s : output) {
-      builder.append(s).append('\n');
-      if (s.contains(text)) {
-        found = true;
-        break;
-      }
-    }
-
-    if (!found) {
-      String message =
-          "Text \"" + text + "\" not found in " + output.size() + " lines\n";
-      fail(message + builder.toString());
-    }
-
-  }
-
-
   private LongLivedProcess initProcess(List<String> commands) {
     process = new LongLivedProcess(name.getMethodName(), log, commands);
     process.setLifecycleCallback(this);
@@ -147,7 +145,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
    * Handler for callback events on the process
    */
 
-
   @Override
   public void onProcessStarted(LongLivedProcess process) {
     started = true;
@@ -157,7 +154,9 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
    * Handler for callback events on the process
    */
   @Override
-  public void onProcessExited(LongLivedProcess process, int exitCode) {
+  public void onProcessExited(LongLivedProcess process,
+      int exitCode,
+      int signCorrectedCode) {
     this.exitCode = exitCode;
     stopped = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
index e9c0271..9514f47 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
@@ -54,7 +54,7 @@ public class TestWorkflowExecutorService extends WorkflowServiceTestBase {
   private static class ExecutorSvc extends AbstractWorkflowExecutorService {
     private ExecutorSvc() {
       super("ExecutorService",
-          ServiceThreadFactory.newSingleThreadExecutor("test", true));
+          ServiceThreadFactory.singleThreadExecutor("test", true));
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index ab57644..6c0cdc4 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -28,6 +28,8 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public abstract class WorkflowServiceTestBase extends Assert {
   private static final Logger
       log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
@@ -40,7 +42,7 @@ public abstract class WorkflowServiceTestBase extends Assert {
 
   @Rule
   public TestName name = new TestName();
-  
+
   @Before
   public void nameThread() {
     Thread.currentThread().setName("JUnit");
@@ -103,4 +105,23 @@ public abstract class WorkflowServiceTestBase extends Assert {
       result = parameter;
     }
   }
+
+  public void assertStringInOutput(String text, List<String> output) {
+    assertTrue("Empty output list", !output.isEmpty());
+    boolean found = false;
+    StringBuilder builder = new StringBuilder();
+    for (String s : output) {
+      builder.append(s).append('\n');
+      if (s.contains(text)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      String message =
+          "Text \"" + text + "\" not found in " + output.size() + " lines\n";
+      fail(message + builder.toString());
+    }
+  }
 }


[5/8] git commit: SLIDER-94 TestForkedProcessService verifies that an error code is uprated to a service failure

Posted by st...@apache.org.
SLIDER-94 TestForkedProcessService verifies that an error code is uprated to a service failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/37703700
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/37703700
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/37703700

Branch: refs/heads/develop
Commit: 37703700523194ef9c266e2792e215bf6152f675
Parents: 4fb479d
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 17:35:40 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 17:36:26 2014 +0100

----------------------------------------------------------------------
 .../server/services/workflow/ForkedProcessService.java    |  4 +++-
 .../slider/server/services/workflow/LongLivedProcess.java | 10 +++++++---
 .../services/workflow/TestForkedProcessService.java       |  6 +++++-
 .../server/services/workflow/TestLongLivedProcess.java    |  3 ---
 4 files changed, 15 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/37703700/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index 3e9ce93..b5459da 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -33,7 +33,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 /**
  * Service wrapper for an external program that is launched and can/will terminate.
  * This service is notified when the subprocess terminates, and stops itself 
- * and converts a non-zero exit code into a failure exception
+ * and converts a non-zero exit code into a failure exception.
+ * 
+ * 
  */
 public class ForkedProcessService extends AbstractWorkflowExecutorService implements
     LongLivedProcessLifecycleEvent, Runnable {

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/37703700/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index b46fcd0..fe895e9 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -39,15 +39,19 @@ import java.util.concurrent.TimeUnit;
 /**
  * Execute a long-lived process.
  *
+ * <p>
  * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
  * a short lived application; this class allows for the process to run for the
  * life of the Java process that forked it.
- * 
- * Key Features
+ * It is designed to be embedded inside a YARN service, though this is not
+ * the sole way that it can be used
+ * <p>
+ * Key Features:
  * <ol>
  *   <li>Output is streamed to the output logger provided</li>.
  *   <li>The most recent lines of output are saved to a linked list</li>.
- *   <li>A callback, {@link LongLivedProcessLifecycleEvent}, is raised on</li>
+ *   <li>A synchronous callback, {@link LongLivedProcessLifecycleEvent}, is raised on the start
+ *   and finish of a process.</li>
  * </ol>
  * 
  */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/37703700/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
index a1d5450..1ac89a5 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
@@ -69,6 +69,8 @@ public class TestForkedProcessService extends WorkflowServiceTestBase {
     assertEquals(0, process.getExitCode());
 
     assertStringInOutput("test-classes", getFinalOutput());
+    // assert that the service did not fail
+    assertNull(process.getFailureCause());
   }
 
   @Test
@@ -81,6 +83,9 @@ public class TestForkedProcessService extends WorkflowServiceTestBase {
     assertTrue(exitCode != 0);
     int corrected = process.getExitCodeSignCorrected();
     assertEquals(1, corrected);
+    // assert that the exit code was uprated to a service failure
+    assertNotNull(process.getFailureCause());
+
   }
 
   @Test
@@ -111,7 +116,6 @@ public class TestForkedProcessService extends WorkflowServiceTestBase {
   /**
    * Get the final output. includes a quick sleep for the tail output
    * @return the last output
-   * @throws InterruptedException
    */
   private List<String> getFinalOutput() {
     return process.getRecentOutput();

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/37703700/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
index c9172e2..c8a0719 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
@@ -45,7 +45,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
   private File testDir = new File("target");
   private ProcessCommandFactory commandFactory;
   private volatile boolean started, stopped;
-  private volatile int exitCode;
 
   @Before
   public void setupProcesses() {
@@ -129,7 +128,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
   /**
    * Get the final output. includes a quick sleep for the tail output
    * @return the last output
-   * @throws InterruptedException
    */
   private List<String> getFinalOutput() {
     return process.getRecentOutput();
@@ -157,7 +155,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
   public void onProcessExited(LongLivedProcess process,
       int exitCode,
       int signCorrectedCode) {
-    this.exitCode = exitCode;
     stopped = true;
   }
 }


[2/8] git commit: SLIDER-94 add tests for WorkflowExecutorService and ServiceTerminatingRunnable; some cleanup of source of rest

Posted by st...@apache.org.
SLIDER-94 add tests for WorkflowExecutorService and ServiceTerminatingRunnable; some cleanup of source of rest


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/bd20dc6c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/bd20dc6c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/bd20dc6c

Branch: refs/heads/develop
Commit: bd20dc6ca31d63f63cb0ce5355bc69b8afe86d90
Parents: c8e0936
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 12:15:14 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 12:15:14 2014 +0100

----------------------------------------------------------------------
 .../server/appmaster/RoleLaunchService.java     |  4 +-
 .../server/appmaster/SliderAppMaster.java       |  6 +-
 .../AbstractWorkflowExecutorService.java        | 79 ++++++++++++++++++++
 .../services/workflow/ForkedProcessService.java |  6 +-
 .../services/workflow/LongLivedProcess.java     | 10 +--
 .../workflow/ServiceTerminatingRunnable.java    |  4 +-
 .../services/workflow/ServiceThreadFactory.java |  6 +-
 .../workflow/WorkflowCompositeService.java      |  2 +-
 .../workflow/WorkflowEventCallback.java         |  4 +-
 .../workflow/WorkflowEventNotifyingService.java |  5 +-
 .../workflow/WorkflowExecutorService.java       | 73 ------------------
 .../workflow/WorkflowSequenceService.java       |  2 +-
 .../server/services/workflow/package-info.java  | 10 ++-
 .../workflow/ParentWorkflowTestBase.java        | 66 ++++++++++++++++
 .../services/workflow/SimpleRunnable.java       | 46 ++++++++++++
 .../services/workflow/TestCloseableService.java |  6 --
 .../TestServiceTerminatingRunnable.java         | 64 ++++++++++++++++
 .../workflow/TestWorkflowCompositeService.java  |  2 +-
 .../workflow/TestWorkflowExecutorService.java   | 61 +++++++++++++++
 .../workflow/TestWorkflowSequenceService.java   |  4 +-
 .../workflow/WorkflowServiceTestBase.java       | 39 +++-------
 .../accumulo/AccumuloProviderService.java       |  8 +-
 22 files changed, 368 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
index 2c5ab2b..5a5baaa 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/RoleLaunchService.java
@@ -30,8 +30,8 @@ import org.apache.slider.providers.ProviderRole;
 import org.apache.slider.providers.ProviderService;
 import org.apache.slider.server.appmaster.state.RoleInstance;
 import org.apache.slider.server.appmaster.state.RoleStatus;
+import org.apache.slider.server.services.workflow.AbstractWorkflowExecutorService;
 import org.apache.slider.server.services.workflow.ServiceThreadFactory;
-import org.apache.slider.server.services.workflow.WorkflowExecutorService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +41,7 @@ import java.util.concurrent.Executors;
 /**
  * A service for launching containers
  */
-public class RoleLaunchService extends WorkflowExecutorService {
+public class RoleLaunchService extends AbstractWorkflowExecutorService {
   protected static final Logger log =
     LoggerFactory.getLogger(RoleLaunchService.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
index 25a9c7a..f245d56 100644
--- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
+++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java
@@ -1321,7 +1321,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
       // didn't start, so don't register
       providerService.start();
       // and send the started event ourselves
-      eventCallbackEvent(null);
+      eventCallbackEvent(this, null, null);
     }
   }
 
@@ -1331,7 +1331,9 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
   /* =================================================================== */
 
   @Override // EventCallback
-  public void eventCallbackEvent(Object parameter) {
+  public void eventCallbackEvent(Object caller,
+      Object parameter,
+      Exception exception) {
     // signalled that the child process is up.
     appState.noteAMLive();
     // now ask for the cluster nodes

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
new file mode 100644
index 0000000..29d04e1
--- /dev/null
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.AbstractService;
+
+import java.util.concurrent.ExecutorService;
+
+/**
+ * A service that hosts an executor -in shutdown it is stopped.
+ */
+public abstract class AbstractWorkflowExecutorService extends AbstractService {
+
+  private ExecutorService executor;
+  
+  public AbstractWorkflowExecutorService(String name) {
+    this(name, null);
+  }
+
+  protected AbstractWorkflowExecutorService(String name,
+      ExecutorService executor) {
+    super(name);
+    this.executor = executor;
+  }
+
+  public ExecutorService getExecutor() {
+    return executor;
+  }
+
+  protected void setExecutor(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * Execute the runnable with the executor (which 
+   * must have been created already)
+   * @param runnable runnable to execute
+   */
+  public void execute(Runnable runnable) {
+    executor.execute(runnable);
+  }
+
+  /**
+   * Stop the service: halt the executor. 
+   * @throws Exception exception.
+   */
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    stopExecutor();
+  }
+
+  /**
+   * Stop the executor if it is not null.
+   * This uses {@link ExecutorService#shutdownNow()}
+   * and so does not block until they have completed.
+   */
+  protected void stopExecutor() {
+    if (executor != null) {
+      executor.shutdownNow();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index 1d6ccae..c3744f8 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * This service is notified when the subprocess terminates, and stops itself 
  * and converts a non-zero exit code into a failure exception
  */
-public class ForkedProcessService extends WorkflowExecutorService implements
+public class ForkedProcessService extends AbstractWorkflowExecutorService implements
     LongLivedProcessLifecycleEvent, Runnable {
 
   /**
@@ -115,7 +115,7 @@ public class ForkedProcessService extends WorkflowExecutorService implements
 
   @Override // ApplicationEventHandler
   public synchronized void onProcessStarted(LongLivedProcess process) {
-    LOG.info("Process has started");
+    LOG.debug("Process has started");
     processStarted = true;
     if (executionTimeout > 0) {
       setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
@@ -128,7 +128,7 @@ public class ForkedProcessService extends WorkflowExecutorService implements
     synchronized (this) {
       completed(code);
       //note whether or not the service had already stopped
-      LOG.info("Process has exited with exit code {}", code);
+      LOG.debug("Process has exited with exit code {}", code);
       if (code != 0) {
         reportFailure(code, getName() + " failed with code " + code);
       }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 22a4c00..8293255 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -80,8 +80,8 @@ public class LongLivedProcess implements Runnable {
   public LongLivedProcess(String name,
       Logger processLog,
       List<String> commands) {
-    Preconditions.checkNotNull(processLog, "null processLog argument");
-    Preconditions.checkNotNull(commands, "null commands argument");
+    Preconditions.checkArgument(processLog != null, "processLog");
+    Preconditions.checkArgument(commands != null, "commands");
 
     this.name = name;
     this.processLog = processLog;
@@ -105,7 +105,7 @@ public class LongLivedProcess implements Runnable {
    * @param lifecycleCallback callback to notify on application exit
    */
   public void setLifecycleCallback(LongLivedProcessLifecycleEvent lifecycleCallback) {
-    Preconditions.checkNotNull(lifecycleCallback, "null lifecycleCallback");
+    Preconditions.checkArgument(lifecycleCallback != null, "lifecycleCallback");
     this.lifecycleCallback = lifecycleCallback;
   }
 
@@ -115,8 +115,8 @@ public class LongLivedProcess implements Runnable {
    * @param val value 
    */
   public void putEnv(String envVar, String val) {
-    Preconditions.checkNotNull(envVar, "null envVar");
-    Preconditions.checkNotNull(val, "null 'val'");
+    Preconditions.checkArgument(envVar != null, "envVar");
+    Preconditions.checkArgument(val != null, "val");
     processBuilder.environment().put(envVar, val);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
index f2563ec..f4e95b9 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -31,8 +31,8 @@ public class ServiceTerminatingRunnable implements Runnable {
   private Exception exception;
 
   public ServiceTerminatingRunnable(Service owner, Runnable action) {
-    Preconditions.checkNotNull(owner, "null owner");
-    Preconditions.checkNotNull(action, "null action");
+    Preconditions.checkArgument(owner!=null, "null owner");
+    Preconditions.checkArgument(action!=null, "null action");
     this.owner = owner;
     this.action = action;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
index b1d235d..6518126 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -60,8 +60,8 @@ public class ServiceThreadFactory implements ThreadFactory {
   public ServiceThreadFactory(String name,
       boolean daemons,
       String namingFormat) {
-    Preconditions.checkNotNull(name, "null name");
-    Preconditions.checkNotNull(namingFormat, "null naming format");
+    Preconditions.checkArgument(name != null, "null name");
+    Preconditions.checkArgument(namingFormat != null, "null naming format");
     this.name = name;
     this.daemons = daemons;
     this.namingFormat = namingFormat;
@@ -80,7 +80,7 @@ public class ServiceThreadFactory implements ThreadFactory {
 
   @Override
   public Thread newThread(Runnable r) {
-    Preconditions.checkNotNull(r, "null runnable");
+    Preconditions.checkArgument(r != null, "null runnable");
     String threadName =
         String.format(namingFormat, name, counter.getAndIncrement());
     return new Thread(r, threadName);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
index e556422..a7d9545 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowCompositeService.java
@@ -93,7 +93,7 @@ public class WorkflowCompositeService extends CompositeService
    */
   @Override
   public synchronized void addService(Service service) {
-    Preconditions.checkNotNull(service, "null service argument");
+    Preconditions.checkArgument(service != null, "null service argument");
     service.registerServiceListener(this);
     super.addService(service);
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
index a0f954e..fe28e38 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventCallback.java
@@ -24,6 +24,8 @@ package org.apache.slider.server.services.workflow;
  */
 public interface WorkflowEventCallback {
   
-  public void eventCallbackEvent(Object parameter);
+  public void eventCallbackEvent(Object caller,
+      Object parameter,
+      Exception exception);
   
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
index d504d13..cca279e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -28,7 +28,8 @@ import org.slf4j.LoggerFactory;
  * The notifications come in on a callback thread -a thread that is only
  * started in this service's <code>start()</code> operation.
  */
-public class WorkflowEventNotifyingService extends WorkflowExecutorService
+public class WorkflowEventNotifyingService extends
+    AbstractWorkflowExecutorService
     implements Runnable {
   protected static final Logger LOG =
     LoggerFactory.getLogger(WorkflowEventNotifyingService.class);
@@ -106,7 +107,7 @@ public class WorkflowEventNotifyingService extends WorkflowExecutorService
       }
     }
     LOG.debug("Notifying {}", callback);
-    callback.eventCallbackEvent(parameter);
+    callback.eventCallbackEvent(this, parameter, null);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
deleted file mode 100644
index a61ac05..0000000
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowExecutorService.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.slider.server.services.workflow;
-
-import org.apache.hadoop.service.AbstractService;
-
-import java.util.concurrent.ExecutorService;
-
-/**
- * A service that hosts an executor -in shutdown it is stopped.
- */
-public class WorkflowExecutorService extends AbstractService {
-
-  private ExecutorService executor;
-  
-  public WorkflowExecutorService(String name) {
-    super(name);
-  }
-
-  public ExecutorService getExecutor() {
-    return executor;
-  }
-
-  protected void setExecutor(ExecutorService executor) {
-    this.executor = executor;
-  }
-
-  /**
-   * Execute the runnable with the executor (which 
-   * must have been created already)
-   * @param runnable runnable to execute
-   */
-  protected void execute(Runnable runnable) {
-    executor.execute(runnable);
-  }
-
-  /**
-   * Stop the service: halt the executor. 
-   * @throws Exception exception.
-   */
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-    stopExecutor();
-  }
-
-  /**
-   * Stop the executor if it is not null.
-   * This uses {@link ExecutorService#shutdownNow()}
-   * and so does not block until they have completed.
-   */
-  protected void stopExecutor() {
-    if (executor != null) {
-      executor.shutdownNow();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
index c42e784..ca07f99 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowSequenceService.java
@@ -274,7 +274,7 @@ public class WorkflowSequenceService extends AbstractService implements
    */
   @Override
   public synchronized void addService(Service service) {
-    Preconditions.checkNotNull(service, "null service argument");
+    Preconditions.checkArgument(service != null, "null service argument");
     LOG.debug("Adding service {} ", service.getName());
     synchronized (serviceList) {
       serviceList.add(service);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
index 89275c8..b6492fe 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/package-info.java
@@ -84,7 +84,7 @@ package org.apache.slider.server.services.workflow;
  
  
  <h2>
- Other workflow services
+ Other Workflow Services
  </h2>
  
  <ul>
@@ -93,13 +93,15 @@ package org.apache.slider.server.services.workflow;
  <li>{@link org.apache.slider.server.services.workflow.ForkedProcessService}:
  Executes a process when started, and binds to the life of that process. When the
  process terminates, so does the service -and vice versa.</li>
- <li>{@link }: </li>
+ <li>{@link org.apache.slider.server.services.workflow.ClosingService}: Closes
+ an instance of <code>Closeable</code> when the service is stopped. This
+ is purely a housekeeping class.</></li>
  <li>{@link }: </li>
  </ul>
 
-Lower level classes 
+Lower-level classes 
  <ul>
- <li>{@link org.apache.slider.server.services.workflow.WorkflowExecutorService }:
+ <li>{@link org.apache.slider.server.services.workflow.AbstractWorkflowExecutorService }:
  This is a base class for YARN services that use an {@link java.util.concurrent.ExecutorService}.
  for managing asynchronous operations: it stops the executor when the service is
  stopped.

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
new file mode 100644
index 0000000..000705f
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ParentWorkflowTestBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+
+public abstract class ParentWorkflowTestBase extends WorkflowServiceTestBase {
+
+  /**
+   * Wait a second for the service parent to stop
+   * @param parent the service to wait for
+   */
+  protected void waitForParentToStop(ServiceParent parent) {
+    waitForParentToStop(parent, 1000);
+  }
+
+  /**
+   * Wait for the service parent to stop
+   * @param parent the service to wait for
+   * @param timeout time in milliseconds
+   */
+  protected void waitForParentToStop(ServiceParent parent, int timeout) {
+    boolean stop = parent.waitForServiceToStop(timeout);
+    if (!stop) {
+      logState(parent);
+      fail("Service failed to stop : after " + timeout + " millis " + parent);
+    }
+  }
+
+  /**
+   * Subclasses are require to implement this and return an instance of a
+   * ServiceParent
+   * @param services a possibly empty list of services
+   * @return an inited -but -not-started- service parent instance
+   */
+  protected abstract ServiceParent buildService(Service... services);
+
+  /**
+   * Use {@link #buildService(Service...)} to create service and then start it
+   * @param services
+   * @return
+   */
+  protected ServiceParent startService(Service... services) {
+    ServiceParent parent = buildService(services);
+    //expect service to start and stay started
+    parent.start();
+    return parent;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/SimpleRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/SimpleRunnable.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/SimpleRunnable.java
new file mode 100644
index 0000000..1f330f4
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/SimpleRunnable.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+/**
+ * Test runnable that can be made to exit, or throw an exception
+ * during its run
+ */
+class SimpleRunnable implements Runnable {
+  boolean throwException = false;
+
+
+  SimpleRunnable() {
+  }
+
+  SimpleRunnable(boolean throwException) {
+    this.throwException = throwException;
+  }
+
+  @Override
+  public synchronized void run() {
+    try {
+      if (throwException) {
+        throw new RuntimeException("SimpleRunnable");
+      }
+    } finally {
+      this.notify();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
index cb984a9..3623687 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestCloseableService.java
@@ -19,7 +19,6 @@
 package org.apache.slider.server.services.workflow;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
 import org.junit.Test;
 
 import java.io.Closeable;
@@ -27,11 +26,6 @@ import java.io.IOException;
 
 public class TestCloseableService extends WorkflowServiceTestBase {
 
-  @Override
-  protected ServiceParent buildService(Service... services) {
-    throw new AssertionError("Unimplemented");
-  }
-
   @Test
   public void testSimpleClose() throws Throwable {
     ClosingService<OpenClose> svc = instance(false);

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
new file mode 100644
index 0000000..ffedd6e
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestServiceTerminatingRunnable.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.junit.Test;
+
+
+public class TestServiceTerminatingRunnable extends WorkflowServiceTestBase {
+
+  @Test
+  public void testNoservice() throws Throwable {
+
+    try {
+      new ServiceTerminatingRunnable(null, new SimpleRunnable());
+      fail("unexpected ");
+    } catch (IllegalArgumentException e) {
+
+      // expected 
+    }
+  }
+
+
+  @Test
+  public void testBasicRun() throws Throwable {
+
+    WorkflowCompositeService svc = run(new WorkflowCompositeService());
+    ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
+        new SimpleRunnable());
+
+    // synchronous in-thread execution
+    runnable.run();
+    assertStopped(svc);
+  }
+
+  @Test
+  public void testFailureRun() throws Throwable {
+
+    WorkflowCompositeService svc = run(new WorkflowCompositeService());
+    ServiceTerminatingRunnable runnable =
+        new ServiceTerminatingRunnable(svc, new SimpleRunnable(true));
+
+    // synchronous in-thread execution
+    runnable.run();
+    assertStopped(svc);
+    assertNotNull(runnable.getException());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
index d4f90a5..9a2c368 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowCompositeService.java
@@ -25,7 +25,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestWorkflowCompositeService extends WorkflowServiceTestBase {
+public class TestWorkflowCompositeService extends ParentWorkflowTestBase {
   private static final Logger
       log = LoggerFactory.getLogger(TestWorkflowCompositeService.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
new file mode 100644
index 0000000..e9c0271
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.junit.Test;
+
+
+public class TestWorkflowExecutorService extends WorkflowServiceTestBase {
+
+
+  @Test
+  public void testAsyncRun() throws Throwable {
+
+    ExecutorSvc svc = run(new ExecutorSvc());
+    ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
+        new SimpleRunnable());
+
+    // synchronous in-thread execution
+    svc.execute(runnable);
+    Thread.sleep(1000);
+    assertStopped(svc);
+  }
+
+  @Test
+  public void testFailureRun() throws Throwable {
+
+    ExecutorSvc svc = run(new ExecutorSvc());
+    ServiceTerminatingRunnable runnable = new ServiceTerminatingRunnable(svc,
+        new SimpleRunnable(true));
+
+    // synchronous in-thread execution
+    svc.execute(runnable);
+    Thread.sleep(1000);
+    assertStopped(svc);
+    assertNotNull(runnable.getException());
+  }
+
+  private static class ExecutorSvc extends AbstractWorkflowExecutorService {
+    private ExecutorSvc() {
+      super("ExecutorService",
+          ServiceThreadFactory.newSingleThreadExecutor("test", true));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
index 80b70a2..22b595e 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowSequenceService.java
@@ -24,7 +24,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
+public class TestWorkflowSequenceService extends ParentWorkflowTestBase {
   private static final Logger
       log = LoggerFactory.getLogger(TestWorkflowSequenceService.class);
 
@@ -48,7 +48,7 @@ public class TestWorkflowSequenceService extends WorkflowServiceTestBase {
     waitForParentToStop(parent);
     assertStopped(one);
     assertStopped(two);
-    assert ((WorkflowSequenceService)parent).getPreviousService().equals(two);
+    assert ((WorkflowSequenceService) parent).getPreviousService().equals(two);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index 657cc31..d485148 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.slider.server.services.workflow;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.junit.Assert;
 import org.junit.Before;
@@ -42,7 +43,7 @@ public abstract class WorkflowServiceTestBase extends Assert {
     Thread.currentThread().setName("JUnit");
   }
 
-  
+
   protected void assertInState(Service service, Service.STATE expected) {
     Service.STATE actual = service.getServiceState();
     if (actual != expected) {
@@ -71,34 +72,16 @@ public abstract class WorkflowServiceTestBase extends Assert {
     }
   }
 
-  /**
-   * Wait a second for the service parent to stop
-   * @param parent the service to wait for
-   */
-  protected void waitForParentToStop(ServiceParent parent) {
-    waitForParentToStop(parent, 1000);
-  }
 
   /**
-   * Wait for the service parent to stop
-   * @param parent the service to wait for
-   * @param timeout time in milliseconds
+   * Init and start a service
+   * @param svc the service
+   * @return the service
    */
-  protected void waitForParentToStop(ServiceParent parent, int timeout) {
-    boolean stop = parent.waitForServiceToStop(timeout);
-    if (!stop) {
-      logState(parent);
-      fail("Service failed to stop : after " + timeout +" millis " + parent);
-    }
-  }
-
-  protected abstract ServiceParent buildService(Service... services);
-
-  protected ServiceParent startService(Service... services) {
-    ServiceParent parent = buildService(services);
-    //expect service to start and stay started
-    parent.start();
-    return parent;
+  protected <S extends Service> S run(S svc) {
+    svc.init(new Configuration());
+    svc.start();
+    return svc;
   }
 
   /**
@@ -109,7 +92,9 @@ public abstract class WorkflowServiceTestBase extends Assert {
     public Object result;
 
     @Override
-    public void eventCallbackEvent(Object parameter) {
+    public void eventCallbackEvent(Object caller,
+        Object parameter,
+        Exception exception) {
       log.info("EventCallback");
       notified = true;
       result = parameter;

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/bd20dc6c/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
----------------------------------------------------------------------
diff --git a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
index 2210ff3..b43751d 100644
--- a/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
+++ b/slider-providers/accumulo/slider-accumulo-provider/src/main/java/org/apache/slider/providers/accumulo/AccumuloProviderService.java
@@ -264,9 +264,8 @@ public class AccumuloProviderService extends AbstractProviderService implements
   public boolean exec(AggregateConf instanceDefinition,
                       File confDir,
                       Map<String, String> env,
-                      WorkflowEventCallback execInProgress) throws
-                                                 IOException,
-      SliderException {
+                      WorkflowEventCallback execInProgress)
+      throws IOException, SliderException {
 
 
     //now pull in these files and do a bit of last-minute validation
@@ -331,7 +330,8 @@ public class AccumuloProviderService extends AbstractProviderService implements
     
     //callback to AM to trigger cluster review is set up to happen after
     //the init/verify action has succeeded
-    WorkflowEventNotifyingService notifier = new WorkflowEventNotifyingService(execInProgress,
+    WorkflowEventNotifyingService notifier = new WorkflowEventNotifyingService(
+        execInProgress,
         null,
         internalOperations.getGlobalOptions().getOptionInt(
              OptionKeys.INTERNAL_CONTAINER_STARTUP_DELAY,