You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/02 20:11:55 UTC

[4/8] git commit: SLIDER-94 ... + tests for ForkedProcessService, change reporting order to avoid service termination before output streams of child process flushed


SLIDER-94 ... + tests for ForkedProcessService, change reporting order to avoid service termination before output streams of child process flushed



Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4fb479d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4fb479d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4fb479d5

Branch: refs/heads/develop
Commit: 4fb479d5f0e5aa0f0f99b4d0c860eb0b604494f6
Parents: 0f1ab84
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 17:26:23 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 17:26:23 2014 +0100

----------------------------------------------------------------------
 .../AbstractWorkflowExecutorService.java        |  11 ++
 .../services/workflow/ClosingService.java       |   4 +-
 .../services/workflow/ForkedProcessService.java |  60 +++++---
 .../services/workflow/LongLivedProcess.java     |  78 ++++++++---
 .../LongLivedProcessLifecycleEvent.java         |   5 +-
 .../workflow/ServiceTerminatingRunnable.java    |   4 +-
 .../services/workflow/ServiceThreadFactory.java |   4 +-
 .../workflow/WorkflowEventNotifyingService.java |   2 +-
 .../services/workflow/EndOfServiceWaiter.java   |  56 ++++++++
 .../workflow/ProcessCommandFactory.java         |  12 ++
 .../workflow/TestForkedProcessService.java      | 136 +++++++++++++++++++
 .../services/workflow/TestLongLivedProcess.java |  47 ++++---
 .../workflow/TestWorkflowExecutorService.java   |   2 +-
 .../workflow/WorkflowServiceTestBase.java       |  23 +++-
 14 files changed, 373 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
index 29d04e1..17d3b50 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
@@ -20,7 +20,9 @@ package org.apache.slider.server.services.workflow;
 
 import org.apache.hadoop.service.AbstractService;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 
 /**
  * A service that hosts an executor -in shutdown it is stopped.
@@ -57,6 +59,15 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
   }
 
   /**
+   * Submit a callable
+   * @param callable callable
+   * @param <V> type of the final get
+   * @return a future to wait on
+   */
+  public <V> Future<V> submit(Callable<V> callable) {
+    return executor.submit(callable);
+  }
+  /**
    * Stop the service: halt the executor. 
    * @throws Exception exception.
    */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
index 6751347..8468a98 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -18,7 +18,6 @@
 
 package org.apache.slider.server.services.workflow;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.service.AbstractService;
 
 import java.io.Closeable;
@@ -26,6 +25,9 @@ import java.io.IOException;
 
 /**
  * Service that closes the closeable supplied during shutdown, if not null.
+ * 
+ * As the Service interface itself extends Closeable, this service
+ * can be used to shut down other services if desired.
  */
 public class ClosingService<C extends Closeable> extends AbstractService {
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index a5c042a..3e9ce93 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -47,8 +47,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
   private final AtomicBoolean processTerminated = new AtomicBoolean(false);
   private boolean processStarted = false;
   private LongLivedProcess process;
-  private Map<String, String> environment;
-  private List<String> commands;
   private int executionTimeout = -1;
   private int timeoutCode = 1;
 
@@ -65,6 +63,19 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
     super(name);
   }
 
+  /**
+   * Create an instance of the service,  set up the process
+   * @param name a name
+   * @param commandList list of commands is inserted on the front
+   * @param env environment variables above those generated by
+   * @throws IOException IO problems
+   */
+  public ForkedProcessService(String name, Map<String, String> env,
+      List<String> commandList) throws IOException {
+    super(name);
+    build(env, commandList);
+  }
+
   @Override //AbstractService
   protected void serviceStart() throws Exception {
     if (process == null) {
@@ -105,42 +116,41 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
                     List<String> commandList)
       throws IOException {
     assert process == null;
-    this.commands = commandList;
-    this.environment = env;
     process = new LongLivedProcess(getName(), LOG, commandList);
     process.setLifecycleCallback(this);
     //set the env variable mapping
     process.putEnvMap(env);
   }
 
-  @Override // ApplicationEventHandler
+  @Override // notification from executed process
   public synchronized void onProcessStarted(LongLivedProcess process) {
     LOG.debug("Process has started");
     processStarted = true;
     if (executionTimeout > 0) {
-      setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+      setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
       execute(this);
     }
   }
 
-  @Override // ApplicationEventHandler
-  public void onProcessExited(LongLivedProcess process, int code) {
-    synchronized (this) {
-      completed(code);
-      //note whether or not the service had already stopped
-      LOG.debug("Process has exited with exit code {}", code);
-      if (code != 0) {
-        reportFailure(code, getName() + " failed with code " + code);
+  @Override  // notification from executed process
+  public void onProcessExited(LongLivedProcess process,
+      int uncorrected,
+      int code) {
+    try {
+      synchronized (this) {
+        completed(code);
+        //note whether or not the service had already stopped
+        LOG.debug("Process has exited with exit code {}", code);
+        if (code != 0) {
+          reportFailure(code, getName() + " failed with code " + code);
+        }
       }
-    }
-    //now stop itself
-    if (!isInState(STATE.STOPPED)) {
+    } finally {
       stop();
     }
   }
 
   private void reportFailure(int code, String text) {
-    this.exitCode.set(code);
     //error
     ServiceLaunchException execEx = new ServiceLaunchException(code, text);
     LOG.debug("Noting failure", execEx);
@@ -184,7 +194,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
    * @param code exit code
    */
   protected void completed(int code) {
-    exitCode.set(code);
     processTerminated.set(true);
     synchronized (processTerminated) {
       processTerminated.notify();
@@ -199,9 +208,20 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
     return processStarted;
   }
 
+  /**
+   * Is a process running: between started and terminated
+   * @return true if the process is up.
+   */
+  public synchronized boolean isProcessRunning() {
+    return processStarted && !isProcessTerminated();
+  }
+
 
   public int getExitCode() {
-    return exitCode.get();
+    return process.getExitCode();
+  }
+  public int getExitCodeSignCorrected() {
+    return process.getExitCodeSignCorrected();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 9efbe9f..b46fcd0 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -42,14 +42,32 @@ import java.util.concurrent.TimeUnit;
  * Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
  * a short lived application; this class allows for the process to run for the
  * life of the Java process that forked it.
+ * 
+ * Key Features
+ * <ol>
+ *   <li>Output is streamed to the output logger provided</li>.
+ *   <li>The most recent lines of output are saved to a linked list</li>.
+ *   <li>A callback, {@link LongLivedProcessLifecycleEvent}, is raised on</li>
+ * </ol>
+ * 
  */
 public class LongLivedProcess implements Runnable {
-  public static final int STREAM_READER_SLEEP_TIME = 200;
+  /**
+   * Limit on number of lines to retain in the "recent" line list:{@value}
+   */
   public static final int RECENT_LINE_LOG_LIMIT = 64;
-  public static final int LINE_LENGTH = 256;
+
+  /**
+   * Const defining the time in millis between polling for new text
+   */
+  private static final int STREAM_READER_SLEEP_TIME = 200;
+  
+  /**
+   * limit on the length of a stream before it triggers an automatic newline
+   */
+  private static final int LINE_LENGTH = 256;
   private final ProcessBuilder processBuilder;
   private Process process;
-  private Exception exception;
   private Integer exitCode = null;
   private final String name;
   private final ExecutorService processExecutor;
@@ -58,7 +76,7 @@ public class LongLivedProcess implements Runnable {
   private ProcessStreamReader processStreamReader;
   //list of recent lines, recorded for extraction into reports
   private final List<String> recentLines = new LinkedList<String>();
-  private final int recentLineLimit = RECENT_LINE_LOG_LIMIT;
+  private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
   private LongLivedProcessLifecycleEvent lifecycleCallback;
 
   
@@ -89,15 +107,15 @@ public class LongLivedProcess implements Runnable {
     processExecutor = Executors.newSingleThreadExecutor(factory);
     logExecutor=    Executors.newSingleThreadExecutor(factory);
     processBuilder = new ProcessBuilder(commands);
-    initBuilder();
-  }
-
-  private void initBuilder() {
     processBuilder.redirectErrorStream(false);
   }
 
-  public ProcessBuilder getProcessBuilder() {
-    return processBuilder;
+  /**
+   * Set the limit on recent lines to retain
+   * @param recentLineLimit size of rolling list of recent lines.
+   */
+  public void setRecentLineLimit(int recentLineLimit) {
+    this.recentLineLimit = recentLineLimit;
   }
 
   /**
@@ -151,13 +169,19 @@ public class LongLivedProcess implements Runnable {
   }
 
   /**
-   * Get any exception raised by the process
-   * @return an exception or null
+   * Get the process builder -this can be manipulated
+   * up to the start() operation. As there is no synchronization
+   * around it, it must only be used in the same thread setting up the commmand.
+   * @return the process builder
    */
-  public Exception getException() {
-    return exception;
+  public ProcessBuilder getProcessBuilder() {
+    return processBuilder;
   }
 
+  /**
+   * Get the command list
+   * @return the comands
+   */
   public List<String> getCommands() {
     return processBuilder.command();
   }
@@ -181,6 +205,22 @@ public class LongLivedProcess implements Runnable {
   public Integer getExitCode() {
     return exitCode;
   }
+  
+    /**
+   * Get the exit code sign corrected: null until the process has finished
+   * @return the exit code or null
+   */
+  public Integer getExitCodeSignCorrected() {
+    Integer result;
+    if (exitCode != null) {
+      result = (exitCode << 24) >> 24;
+    } else {
+      result = null;
+    }
+    return result;
+  }
+  
+  
 
   /**
    * Stop the process if it is running.
@@ -256,10 +296,6 @@ public class LongLivedProcess implements Runnable {
       //tell the logger it has to finish too
       finished = true;
 
-      //now call the callback if it is set
-      if (lifecycleCallback != null) {
-        lifecycleCallback.onProcessExited(this, exitCode);
-      }
       // shut down the threads
       logExecutor.shutdown();
       try {
@@ -267,6 +303,12 @@ public class LongLivedProcess implements Runnable {
       } catch (InterruptedException ignored) {
         //ignored
       }
+
+      //now call the callback if it is set
+      if (lifecycleCallback != null) {
+        lifecycleCallback.onProcessExited(this, exitCode,
+            getExitCodeSignCorrected());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
index af83ed0..86d20ff 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -33,6 +33,9 @@ public interface LongLivedProcessLifecycleEvent {
    * Callback when a process has finished
    * @param process the process invoking the callback
    * @param exitCode exit code from the process
+   * @param signCorrectedCode
    */
-  void onProcessExited(LongLivedProcess process, int exitCode);
+  void onProcessExited(LongLivedProcess process,
+      int exitCode,
+      int signCorrectedCode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
index f4e95b9..8549971 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -31,8 +31,8 @@ public class ServiceTerminatingRunnable implements Runnable {
   private Exception exception;
 
   public ServiceTerminatingRunnable(Service owner, Runnable action) {
-    Preconditions.checkArgument(owner!=null, "null owner");
-    Preconditions.checkArgument(action!=null, "null action");
+    Preconditions.checkArgument(owner != null, "null owner");
+    Preconditions.checkArgument(action != null, "null action");
     this.owner = owner;
     this.action = action;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
index 6518126..7d7110e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  * The thread counter starts at 1, increments atomically, 
  * and is supplied as the second argument in the format string.
  * 
- * A static method, {@link #newSingleThreadExecutor(String, boolean)},
+ * A static method, {@link #singleThreadExecutor(String, boolean)},
  * exists to simplify the construction of an executor with a single well-named
  * threads. 
  * 
@@ -92,7 +92,7 @@ public class ServiceThreadFactory implements ThreadFactory {
    * @param daemons flag to indicate the threads should be marked as daemons
    * @return an executor
    */
-  public static ExecutorService newSingleThreadExecutor(String name,
+  public static ExecutorService singleThreadExecutor(String name,
       boolean daemons) {
     return Executors.newSingleThreadExecutor(
         new ServiceThreadFactory(name, daemons));

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
index cca279e..a86453a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -73,7 +73,7 @@ public class WorkflowEventNotifyingService extends
   @Override
   protected void serviceStart() throws Exception {
     LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
-    setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+    setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
     execute(command);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
new file mode 100644
index 0000000..5e6df3b
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.junit.Assert;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Wait for a service to stop
+ */
+public class EndOfServiceWaiter implements ServiceStateChangeListener {
+
+  private final AtomicBoolean finished = new AtomicBoolean(false);
+
+  public EndOfServiceWaiter(Service svc) {
+    svc.registerServiceListener(this);
+  }
+
+  public synchronized void waitForServiceToStop(long timeout) throws
+      InterruptedException {
+    if (!finished.get()) {
+      wait(timeout);
+    }
+    Assert.assertTrue("Service did not finish in time period",
+        finished.get());
+  }
+
+  @Override
+  public synchronized void stateChanged(Service service) {
+    if (service.isInState(Service.STATE.STOPPED)) {
+      finished.set(true);
+      notify();
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
index 8521d0d..45fdc86 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
@@ -55,6 +55,7 @@ public class ProcessCommandFactory {
     commands.add(text);
     return commands;
   }
+
   /**
    * print env variables
    * @return commands
@@ -66,6 +67,17 @@ public class ProcessCommandFactory {
   }
 
   /**
+   * execute a command that returns with an error code that will
+   * be converted into a number
+   * @return commands
+   */
+  public List<String> exitFalse() {
+    List<String> commands = new ArrayList<String>(2);
+    commands.add("false");
+    return commands;
+  }
+
+  /**
    * Create a process command factory for this OS
    * @return
    */

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
new file mode 100644
index 0000000..a1d5450
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the long lived process by executing a command that works and a command
+ * that fails
+ */
+public class TestForkedProcessService extends WorkflowServiceTestBase {
+  private static final Logger
+      log = LoggerFactory.getLogger(TestForkedProcessService.class);
+
+  private static final Logger
+      processLog =
+      LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+
+
+  private ForkedProcessService process;
+  private File testDir = new File("target");
+  private ProcessCommandFactory commandFactory;
+  private Map<String, String> env = new HashMap<String, String>();
+
+  @Before
+  public void setupProcesses() {
+    commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+  }
+
+  @After
+  public void stopProcesses() {
+    ServiceOperations.stop(process);
+  }
+
+  @Test
+  public void testLs() throws Throwable {
+
+    initProcess(commandFactory.ls(testDir));
+    exec();
+    assertFalse(process.isProcessRunning());
+    assertEquals(0, process.getExitCode());
+
+    assertStringInOutput("test-classes", getFinalOutput());
+  }
+
+  @Test
+  public void testExitCodes() throws Throwable {
+
+    initProcess(commandFactory.exitFalse());
+    exec();
+    assertFalse(process.isProcessRunning());
+    int exitCode = process.getExitCode();
+    assertTrue(exitCode != 0);
+    int corrected = process.getExitCodeSignCorrected();
+    assertEquals(1, corrected);
+  }
+
+  @Test
+  public void testEcho() throws Throwable {
+
+    String echoText = "hello, world";
+    initProcess(commandFactory.echo(echoText));
+    exec();
+
+    assertEquals(0, process.getExitCode());
+    assertStringInOutput(echoText, getFinalOutput());
+
+  }
+
+  @Test
+  public void testSetenv() throws Throwable {
+
+    String var = "TEST_RUN";
+    String val = "TEST-RUN-ENV-VALUE";
+    env.put(var, val);
+    initProcess(commandFactory.env());
+    exec();
+
+    assertEquals(0, process.getExitCode());
+    assertStringInOutput(val, getFinalOutput());
+  }
+
+  /**
+   * Get the final output. includes a quick sleep for the tail output
+   * @return the last output
+   * @throws InterruptedException
+   */
+  private List<String> getFinalOutput() {
+    return process.getRecentOutput();
+  }
+
+  private ForkedProcessService initProcess(List<String> commands) throws
+      IOException {
+    process = new ForkedProcessService(name.getMethodName(), env,
+        commands);
+    process.init(new Configuration());
+
+    return process;
+  }
+
+  public void exec() throws InterruptedException {
+    assertNotNull(process);
+    EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
+    process.start();
+    waiter.waitForServiceToStop(5000);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
index ab37e6a..c9172e2 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
@@ -70,12 +70,31 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
     //here stopped
     assertTrue("process start callback not received", started);
     assertTrue("process stop callback not received", stopped);
+    assertFalse(process.isRunning());
     assertEquals(0, process.getExitCode().intValue());
 
     assertStringInOutput("test-classes", getFinalOutput());
   }
 
   @Test
+  public void testExitCodes() throws Throwable {
+
+    initProcess(commandFactory.exitFalse());
+    process.start();
+    //in-thread wait
+    process.run();
+
+    //here stopped
+
+    assertFalse(process.isRunning());
+    int exitCode = process.getExitCode();
+    assertTrue(exitCode != 0);
+    int corrected = process.getExitCodeSignCorrected();
+
+    assertEquals(1, corrected);
+  }
+
+  @Test
   public void testEcho() throws Throwable {
 
     String echoText = "hello, world";
@@ -95,7 +114,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
 
     String var = "TEST_RUN";
     String val = "TEST-RUN-ENV-VALUE";
-    String echoText = "${TEST_RUN}";
     initProcess(commandFactory.env());
     process.setEnv(var, val);
     process.start();
@@ -113,30 +131,10 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
    * @return the last output
    * @throws InterruptedException
    */
-  private List<String> getFinalOutput() throws InterruptedException {
+  private List<String> getFinalOutput() {
     return process.getRecentOutput();
   }
 
-  public void assertStringInOutput(String text, List<String> output) {
-    boolean found = false;
-    StringBuilder builder = new StringBuilder();
-    for (String s : output) {
-      builder.append(s).append('\n');
-      if (s.contains(text)) {
-        found = true;
-        break;
-      }
-    }
-
-    if (!found) {
-      String message =
-          "Text \"" + text + "\" not found in " + output.size() + " lines\n";
-      fail(message + builder.toString());
-    }
-
-  }
-
-
   private LongLivedProcess initProcess(List<String> commands) {
     process = new LongLivedProcess(name.getMethodName(), log, commands);
     process.setLifecycleCallback(this);
@@ -147,7 +145,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
    * Handler for callback events on the process
    */
 
-
   @Override
   public void onProcessStarted(LongLivedProcess process) {
     started = true;
@@ -157,7 +154,9 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
    * Handler for callback events on the process
    */
   @Override
-  public void onProcessExited(LongLivedProcess process, int exitCode) {
+  public void onProcessExited(LongLivedProcess process,
+      int exitCode,
+      int signCorrectedCode) {
     this.exitCode = exitCode;
     stopped = true;
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
index e9c0271..9514f47 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
@@ -54,7 +54,7 @@ public class TestWorkflowExecutorService extends WorkflowServiceTestBase {
   private static class ExecutorSvc extends AbstractWorkflowExecutorService {
     private ExecutorSvc() {
       super("ExecutorService",
-          ServiceThreadFactory.newSingleThreadExecutor("test", true));
+          ServiceThreadFactory.singleThreadExecutor("test", true));
     }
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index ab57644..6c0cdc4 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -28,6 +28,8 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
+
 public abstract class WorkflowServiceTestBase extends Assert {
   private static final Logger
       log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
@@ -40,7 +42,7 @@ public abstract class WorkflowServiceTestBase extends Assert {
 
   @Rule
   public TestName name = new TestName();
-  
+
   @Before
   public void nameThread() {
     Thread.currentThread().setName("JUnit");
@@ -103,4 +105,23 @@ public abstract class WorkflowServiceTestBase extends Assert {
       result = parameter;
     }
   }
+
+  public void assertStringInOutput(String text, List<String> output) {
+    assertTrue("Empty output list", !output.isEmpty());
+    boolean found = false;
+    StringBuilder builder = new StringBuilder();
+    for (String s : output) {
+      builder.append(s).append('\n');
+      if (s.contains(text)) {
+        found = true;
+        break;
+      }
+    }
+
+    if (!found) {
+      String message =
+          "Text \"" + text + "\" not found in " + output.size() + " lines\n";
+      fail(message + builder.toString());
+    }
+  }
 }