You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/02 20:11:55 UTC
[4/8] git commit: SLIDER-94 ... + tests for ForkedProcessService, change reporting order to avoid service termination before output streams of child process flushed

SLIDER-94 ... + tests for ForkedProcessService, change reporting order to avoid service termination before output streams of child process flushed

Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/4fb479d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/4fb479d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/4fb479d5
Branch: refs/heads/develop
Commit: 4fb479d5f0e5aa0f0f99b4d0c860eb0b604494f6
Parents: 0f1ab84
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 2 17:26:23 2014 +0100
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 2 17:26:23 2014 +0100
----------------------------------------------------------------------
.../AbstractWorkflowExecutorService.java | 11 ++
.../services/workflow/ClosingService.java | 4 +-
.../services/workflow/ForkedProcessService.java | 60 +++++---
.../services/workflow/LongLivedProcess.java | 78 ++++++++---
.../LongLivedProcessLifecycleEvent.java | 5 +-
.../workflow/ServiceTerminatingRunnable.java | 4 +-
.../services/workflow/ServiceThreadFactory.java | 4 +-
.../workflow/WorkflowEventNotifyingService.java | 2 +-
.../services/workflow/EndOfServiceWaiter.java | 56 ++++++++
.../workflow/ProcessCommandFactory.java | 12 ++
.../workflow/TestForkedProcessService.java | 136 +++++++++++++++++++
.../services/workflow/TestLongLivedProcess.java | 47 ++++---
.../workflow/TestWorkflowExecutorService.java | 2 +-
.../workflow/WorkflowServiceTestBase.java | 23 +++-
14 files changed, 373 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
index 29d04e1..17d3b50 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/AbstractWorkflowExecutorService.java
@@ -20,7 +20,9 @@ package org.apache.slider.server.services.workflow;
import org.apache.hadoop.service.AbstractService;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
/**
* A service that hosts an executor -in shutdown it is stopped.
@@ -57,6 +59,15 @@ public abstract class AbstractWorkflowExecutorService extends AbstractService {
}
/**
+ * Submit a callable
+ * @param callable callable
+ * @param <V> type of the final get
+ * @return a future to wait on
+ */
+ public <V> Future<V> submit(Callable<V> callable) {
+ return executor.submit(callable);
+ }
+ /**
* Stop the service: halt the executor.
* @throws Exception exception.
*/
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
index 6751347..8468a98 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ClosingService.java
@@ -18,7 +18,6 @@
package org.apache.slider.server.services.workflow;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.service.AbstractService;
import java.io.Closeable;
@@ -26,6 +25,9 @@ import java.io.IOException;
/**
* Service that closes the closeable supplied during shutdown, if not null.
+ *
+ * As the Service interface itself extends Closeable, this service
+ * can be used to shut down other services if desired.
*/
public class ClosingService<C extends Closeable> extends AbstractService {
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index a5c042a..3e9ce93 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -47,8 +47,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
private final AtomicBoolean processTerminated = new AtomicBoolean(false);
private boolean processStarted = false;
private LongLivedProcess process;
- private Map<String, String> environment;
- private List<String> commands;
private int executionTimeout = -1;
private int timeoutCode = 1;
@@ -65,6 +63,19 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
super(name);
}
+ /**
+ * Create an instance of the service, set up the process
+ * @param name a name
+ * @param commandList list of commands is inserted on the front
+ * @param env environment variables above those generated by
+ * @throws IOException IO problems
+ */
+ public ForkedProcessService(String name, Map<String, String> env,
+ List<String> commandList) throws IOException {
+ super(name);
+ build(env, commandList);
+ }
+
@Override //AbstractService
protected void serviceStart() throws Exception {
if (process == null) {
@@ -105,42 +116,41 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
List<String> commandList)
throws IOException {
assert process == null;
- this.commands = commandList;
- this.environment = env;
process = new LongLivedProcess(getName(), LOG, commandList);
process.setLifecycleCallback(this);
//set the env variable mapping
process.putEnvMap(env);
}
- @Override // ApplicationEventHandler
+ @Override // notification from executed process
public synchronized void onProcessStarted(LongLivedProcess process) {
LOG.debug("Process has started");
processStarted = true;
if (executionTimeout > 0) {
- setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+ setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
execute(this);
}
}
- @Override // ApplicationEventHandler
- public void onProcessExited(LongLivedProcess process, int code) {
- synchronized (this) {
- completed(code);
- //note whether or not the service had already stopped
- LOG.debug("Process has exited with exit code {}", code);
- if (code != 0) {
- reportFailure(code, getName() + " failed with code " + code);
+ @Override // notification from executed process
+ public void onProcessExited(LongLivedProcess process,
+ int uncorrected,
+ int code) {
+ try {
+ synchronized (this) {
+ completed(code);
+ //note whether or not the service had already stopped
+ LOG.debug("Process has exited with exit code {}", code);
+ if (code != 0) {
+ reportFailure(code, getName() + " failed with code " + code);
+ }
}
- }
- //now stop itself
- if (!isInState(STATE.STOPPED)) {
+ } finally {
stop();
}
}
private void reportFailure(int code, String text) {
- this.exitCode.set(code);
//error
ServiceLaunchException execEx = new ServiceLaunchException(code, text);
LOG.debug("Noting failure", execEx);
@@ -184,7 +194,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
* @param code exit code
*/
protected void completed(int code) {
- exitCode.set(code);
processTerminated.set(true);
synchronized (processTerminated) {
processTerminated.notify();
@@ -199,9 +208,20 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
return processStarted;
}
+ /**
+ * Is a process running: between started and terminated
+ * @return true if the process is up.
+ */
+ public synchronized boolean isProcessRunning() {
+ return processStarted && !isProcessTerminated();
+ }
+
public int getExitCode() {
- return exitCode.get();
+ return process.getExitCode();
+ }
+ public int getExitCodeSignCorrected() {
+ return process.getExitCodeSignCorrected();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 9efbe9f..b46fcd0 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -42,14 +42,32 @@ import java.util.concurrent.TimeUnit;
* Hadoop's {@link org.apache.hadoop.util.Shell} class assumes it is executing
* a short lived application; this class allows for the process to run for the
* life of the Java process that forked it.
+ *
+ * Key Features
+ * <ol>
+ * <li>Output is streamed to the output logger provided</li>.
+ * <li>The most recent lines of output are saved to a linked list</li>.
+ * <li>A callback, {@link LongLivedProcessLifecycleEvent}, is raised on</li>
+ * </ol>
+ *
*/
public class LongLivedProcess implements Runnable {
- public static final int STREAM_READER_SLEEP_TIME = 200;
+ /**
+ * Limit on number of lines to retain in the "recent" line list:{@value}
+ */
public static final int RECENT_LINE_LOG_LIMIT = 64;
- public static final int LINE_LENGTH = 256;
+
+ /**
+ * Const defining the time in millis between polling for new text
+ */
+ private static final int STREAM_READER_SLEEP_TIME = 200;
+
+ /**
+ * limit on the length of a stream before it triggers an automatic newline
+ */
+ private static final int LINE_LENGTH = 256;
private final ProcessBuilder processBuilder;
private Process process;
- private Exception exception;
private Integer exitCode = null;
private final String name;
private final ExecutorService processExecutor;
@@ -58,7 +76,7 @@ public class LongLivedProcess implements Runnable {
private ProcessStreamReader processStreamReader;
//list of recent lines, recorded for extraction into reports
private final List<String> recentLines = new LinkedList<String>();
- private final int recentLineLimit = RECENT_LINE_LOG_LIMIT;
+ private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
private LongLivedProcessLifecycleEvent lifecycleCallback;
@@ -89,15 +107,15 @@ public class LongLivedProcess implements Runnable {
processExecutor = Executors.newSingleThreadExecutor(factory);
logExecutor= Executors.newSingleThreadExecutor(factory);
processBuilder = new ProcessBuilder(commands);
- initBuilder();
- }
-
- private void initBuilder() {
processBuilder.redirectErrorStream(false);
}
- public ProcessBuilder getProcessBuilder() {
- return processBuilder;
+ /**
+ * Set the limit on recent lines to retain
+ * @param recentLineLimit size of rolling list of recent lines.
+ */
+ public void setRecentLineLimit(int recentLineLimit) {
+ this.recentLineLimit = recentLineLimit;
}
/**
@@ -151,13 +169,19 @@ public class LongLivedProcess implements Runnable {
}
/**
- * Get any exception raised by the process
- * @return an exception or null
+ * Get the process builder -this can be manipulated
+ * up to the start() operation. As there is no synchronization
+ * around it, it must only be used in the same thread setting up the commmand.
+ * @return the process builder
*/
- public Exception getException() {
- return exception;
+ public ProcessBuilder getProcessBuilder() {
+ return processBuilder;
}
+ /**
+ * Get the command list
+ * @return the comands
+ */
public List<String> getCommands() {
return processBuilder.command();
}
@@ -181,6 +205,22 @@ public class LongLivedProcess implements Runnable {
public Integer getExitCode() {
return exitCode;
}
+
+ /**
+ * Get the exit code sign corrected: null until the process has finished
+ * @return the exit code or null
+ */
+ public Integer getExitCodeSignCorrected() {
+ Integer result;
+ if (exitCode != null) {
+ result = (exitCode << 24) >> 24;
+ } else {
+ result = null;
+ }
+ return result;
+ }
+
+
/**
* Stop the process if it is running.
@@ -256,10 +296,6 @@ public class LongLivedProcess implements Runnable {
//tell the logger it has to finish too
finished = true;
- //now call the callback if it is set
- if (lifecycleCallback != null) {
- lifecycleCallback.onProcessExited(this, exitCode);
- }
// shut down the threads
logExecutor.shutdown();
try {
@@ -267,6 +303,12 @@ public class LongLivedProcess implements Runnable {
} catch (InterruptedException ignored) {
//ignored
}
+
+ //now call the callback if it is set
+ if (lifecycleCallback != null) {
+ lifecycleCallback.onProcessExited(this, exitCode,
+ getExitCodeSignCorrected());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
index af83ed0..86d20ff 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcessLifecycleEvent.java
@@ -33,6 +33,9 @@ public interface LongLivedProcessLifecycleEvent {
* Callback when a process has finished
* @param process the process invoking the callback
* @param exitCode exit code from the process
+ * @param signCorrectedCode
*/
- void onProcessExited(LongLivedProcess process, int exitCode);
+ void onProcessExited(LongLivedProcess process,
+ int exitCode,
+ int signCorrectedCode);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
index f4e95b9..8549971 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceTerminatingRunnable.java
@@ -31,8 +31,8 @@ public class ServiceTerminatingRunnable implements Runnable {
private Exception exception;
public ServiceTerminatingRunnable(Service owner, Runnable action) {
- Preconditions.checkArgument(owner!=null, "null owner");
- Preconditions.checkArgument(action!=null, "null action");
+ Preconditions.checkArgument(owner != null, "null owner");
+ Preconditions.checkArgument(action != null, "null action");
this.owner = owner;
this.action = action;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
index 6518126..7d7110e 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ServiceThreadFactory.java
@@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* The thread counter starts at 1, increments atomically,
* and is supplied as the second argument in the format string.
*
- * A static method, {@link #newSingleThreadExecutor(String, boolean)},
+ * A static method, {@link #singleThreadExecutor(String, boolean)},
* exists to simplify the construction of an executor with a single well-named
* threads.
*
@@ -92,7 +92,7 @@ public class ServiceThreadFactory implements ThreadFactory {
* @param daemons flag to indicate the threads should be marked as daemons
* @return an executor
*/
- public static ExecutorService newSingleThreadExecutor(String name,
+ public static ExecutorService singleThreadExecutor(String name,
boolean daemons) {
return Executors.newSingleThreadExecutor(
new ServiceThreadFactory(name, daemons));
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
index cca279e..a86453a 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/WorkflowEventNotifyingService.java
@@ -73,7 +73,7 @@ public class WorkflowEventNotifyingService extends
@Override
protected void serviceStart() throws Exception {
LOG.debug("Notifying {} after a delay of {} millis", callback, delay);
- setExecutor(ServiceThreadFactory.newSingleThreadExecutor(getName(), true));
+ setExecutor(ServiceThreadFactory.singleThreadExecutor(getName(), true));
execute(command);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
new file mode 100644
index 0000000..5e6df3b
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/EndOfServiceWaiter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.junit.Assert;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Wait for a service to stop
+ */
+public class EndOfServiceWaiter implements ServiceStateChangeListener {
+
+ private final AtomicBoolean finished = new AtomicBoolean(false);
+
+ public EndOfServiceWaiter(Service svc) {
+ svc.registerServiceListener(this);
+ }
+
+ public synchronized void waitForServiceToStop(long timeout) throws
+ InterruptedException {
+ if (!finished.get()) {
+ wait(timeout);
+ }
+ Assert.assertTrue("Service did not finish in time period",
+ finished.get());
+ }
+
+ @Override
+ public synchronized void stateChanged(Service service) {
+ if (service.isInState(Service.STATE.STOPPED)) {
+ finished.set(true);
+ notify();
+ }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
index 8521d0d..45fdc86 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/ProcessCommandFactory.java
@@ -55,6 +55,7 @@ public class ProcessCommandFactory {
commands.add(text);
return commands;
}
+
/**
* print env variables
* @return commands
@@ -66,6 +67,17 @@ public class ProcessCommandFactory {
}
/**
+ * execute a command that returns with an error code that will
+ * be converted into a number
+ * @return commands
+ */
+ public List<String> exitFalse() {
+ List<String> commands = new ArrayList<String>(2);
+ commands.add("false");
+ return commands;
+ }
+
+ /**
* Create a process command factory for this OS
* @return
*/
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
new file mode 100644
index 0000000..a1d5450
--- /dev/null
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestForkedProcessService.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.slider.server.services.workflow;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.ServiceOperations;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test the long lived process by executing a command that works and a command
+ * that fails
+ */
+public class TestForkedProcessService extends WorkflowServiceTestBase {
+ private static final Logger
+ log = LoggerFactory.getLogger(TestForkedProcessService.class);
+
+ private static final Logger
+ processLog =
+ LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+
+
+ private ForkedProcessService process;
+ private File testDir = new File("target");
+ private ProcessCommandFactory commandFactory;
+ private Map<String, String> env = new HashMap<String, String>();
+
+ @Before
+ public void setupProcesses() {
+ commandFactory = ProcessCommandFactory.createProcessCommandFactory();
+ }
+
+ @After
+ public void stopProcesses() {
+ ServiceOperations.stop(process);
+ }
+
+ @Test
+ public void testLs() throws Throwable {
+
+ initProcess(commandFactory.ls(testDir));
+ exec();
+ assertFalse(process.isProcessRunning());
+ assertEquals(0, process.getExitCode());
+
+ assertStringInOutput("test-classes", getFinalOutput());
+ }
+
+ @Test
+ public void testExitCodes() throws Throwable {
+
+ initProcess(commandFactory.exitFalse());
+ exec();
+ assertFalse(process.isProcessRunning());
+ int exitCode = process.getExitCode();
+ assertTrue(exitCode != 0);
+ int corrected = process.getExitCodeSignCorrected();
+ assertEquals(1, corrected);
+ }
+
+ @Test
+ public void testEcho() throws Throwable {
+
+ String echoText = "hello, world";
+ initProcess(commandFactory.echo(echoText));
+ exec();
+
+ assertEquals(0, process.getExitCode());
+ assertStringInOutput(echoText, getFinalOutput());
+
+ }
+
+ @Test
+ public void testSetenv() throws Throwable {
+
+ String var = "TEST_RUN";
+ String val = "TEST-RUN-ENV-VALUE";
+ env.put(var, val);
+ initProcess(commandFactory.env());
+ exec();
+
+ assertEquals(0, process.getExitCode());
+ assertStringInOutput(val, getFinalOutput());
+ }
+
+ /**
+ * Get the final output. includes a quick sleep for the tail output
+ * @return the last output
+ * @throws InterruptedException
+ */
+ private List<String> getFinalOutput() {
+ return process.getRecentOutput();
+ }
+
+ private ForkedProcessService initProcess(List<String> commands) throws
+ IOException {
+ process = new ForkedProcessService(name.getMethodName(), env,
+ commands);
+ process.init(new Configuration());
+
+ return process;
+ }
+
+ public void exec() throws InterruptedException {
+ assertNotNull(process);
+ EndOfServiceWaiter waiter = new EndOfServiceWaiter(process);
+ process.start();
+ waiter.waitForServiceToStop(5000);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
index ab37e6a..c9172e2 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestLongLivedProcess.java
@@ -70,12 +70,31 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
//here stopped
assertTrue("process start callback not received", started);
assertTrue("process stop callback not received", stopped);
+ assertFalse(process.isRunning());
assertEquals(0, process.getExitCode().intValue());
assertStringInOutput("test-classes", getFinalOutput());
}
@Test
+ public void testExitCodes() throws Throwable {
+
+ initProcess(commandFactory.exitFalse());
+ process.start();
+ //in-thread wait
+ process.run();
+
+ //here stopped
+
+ assertFalse(process.isRunning());
+ int exitCode = process.getExitCode();
+ assertTrue(exitCode != 0);
+ int corrected = process.getExitCodeSignCorrected();
+
+ assertEquals(1, corrected);
+ }
+
+ @Test
public void testEcho() throws Throwable {
String echoText = "hello, world";
@@ -95,7 +114,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
String var = "TEST_RUN";
String val = "TEST-RUN-ENV-VALUE";
- String echoText = "${TEST_RUN}";
initProcess(commandFactory.env());
process.setEnv(var, val);
process.start();
@@ -113,30 +131,10 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
* @return the last output
* @throws InterruptedException
*/
- private List<String> getFinalOutput() throws InterruptedException {
+ private List<String> getFinalOutput() {
return process.getRecentOutput();
}
- public void assertStringInOutput(String text, List<String> output) {
- boolean found = false;
- StringBuilder builder = new StringBuilder();
- for (String s : output) {
- builder.append(s).append('\n');
- if (s.contains(text)) {
- found = true;
- break;
- }
- }
-
- if (!found) {
- String message =
- "Text \"" + text + "\" not found in " + output.size() + " lines\n";
- fail(message + builder.toString());
- }
-
- }
-
-
private LongLivedProcess initProcess(List<String> commands) {
process = new LongLivedProcess(name.getMethodName(), log, commands);
process.setLifecycleCallback(this);
@@ -147,7 +145,6 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
* Handler for callback events on the process
*/
-
@Override
public void onProcessStarted(LongLivedProcess process) {
started = true;
@@ -157,7 +154,9 @@ public class TestLongLivedProcess extends WorkflowServiceTestBase implements
* Handler for callback events on the process
*/
@Override
- public void onProcessExited(LongLivedProcess process, int exitCode) {
+ public void onProcessExited(LongLivedProcess process,
+ int exitCode,
+ int signCorrectedCode) {
this.exitCode = exitCode;
stopped = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
index e9c0271..9514f47 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowExecutorService.java
@@ -54,7 +54,7 @@ public class TestWorkflowExecutorService extends WorkflowServiceTestBase {
private static class ExecutorSvc extends AbstractWorkflowExecutorService {
private ExecutorSvc() {
super("ExecutorService",
- ServiceThreadFactory.newSingleThreadExecutor("test", true));
+ ServiceThreadFactory.singleThreadExecutor("test", true));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/4fb479d5/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
index ab57644..6c0cdc4 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/WorkflowServiceTestBase.java
@@ -28,6 +28,8 @@ import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public abstract class WorkflowServiceTestBase extends Assert {
private static final Logger
log = LoggerFactory.getLogger(WorkflowServiceTestBase.class);
@@ -40,7 +42,7 @@ public abstract class WorkflowServiceTestBase extends Assert {
@Rule
public TestName name = new TestName();
-
+
@Before
public void nameThread() {
Thread.currentThread().setName("JUnit");
@@ -103,4 +105,23 @@ public abstract class WorkflowServiceTestBase extends Assert {
result = parameter;
}
}
+
+ public void assertStringInOutput(String text, List<String> output) {
+ assertTrue("Empty output list", !output.isEmpty());
+ boolean found = false;
+ StringBuilder builder = new StringBuilder();
+ for (String s : output) {
+ builder.append(s).append('\n');
+ if (s.contains(text)) {
+ found = true;
+ break;
+ }
+ }
+
+ if (!found) {
+ String message =
+ "Text \"" + text + "\" not found in " + output.size() + " lines\n";
+ fail(message + builder.toString());
+ }
+ }
}