You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@slider.apache.org by st...@apache.org on 2014/06/23 22:31:24 UTC
git commit: SLIDER-159 Jenkins testEcho failing
Repository: incubator-slider
Updated Branches:
refs/heads/develop bc0c67284 -> 537d87c25
SLIDER-159 Jenkins testEcho failing
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/537d87c2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/537d87c2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/537d87c2
Branch: refs/heads/develop
Commit: 537d87c25e20e3c11994dbfdcfcb89ba20f59859
Parents: bc0c672
Author: Steve Loughran <st...@apache.org>
Authored: Mon Jun 23 13:25:41 2014 -0700
Committer: Steve Loughran <st...@apache.org>
Committed: Mon Jun 23 13:25:41 2014 -0700
----------------------------------------------------------------------
.../services/workflow/ForkedProcessService.java | 25 ++++++++++++++++----
.../services/workflow/LongLivedProcess.java | 13 ++++++++++
.../TestWorkflowForkedProcessService.java | 3 ++-
3 files changed, 35 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/537d87c2/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
index b0c503d..7e73005 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/ForkedProcessService.java
@@ -269,15 +269,28 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
/**
* Get the recent output from the process, or [] if not defined
- * @param duration the duration, in ms, which we wait for recent output to become non-empty
+ *
+ * @param finalOutput flag to indicate "wait for the final output of the process"
+ * @param duration the duration, in ms,
+ * ro wait for recent output to become non-empty
* @return a possibly empty list
*/
- public List<String> getRecentOutput(int duration) {
- if (process == null) return new LinkedList<String>();
+ public List<String> getRecentOutput(boolean finalOutput, int duration) {
+ if (process == null) {
+ return new LinkedList<String>();
+ }
long start = System.currentTimeMillis();
- while (process.isRecentOutputEmpty() && System.currentTimeMillis() - start <= duration) {
+ while (System.currentTimeMillis() - start <= duration) {
+ if (finalOutput && process.isFinalOutputProcessed()) {
+ //end of stream, break
+ break;
+ }
+ if (!process.isRecentOutputEmpty()) {
+ // there is some output
+ break;
+ }
try {
- Thread.sleep(20);
+ Thread.sleep(100);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
@@ -285,4 +298,6 @@ public class ForkedProcessService extends AbstractWorkflowExecutorService implem
}
return process.getRecentOutput();
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/537d87c2/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
----------------------------------------------------------------------
diff --git a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
index 7b9863f..d9ddecb 100644
--- a/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
+++ b/slider-core/src/main/java/org/apache/slider/server/services/workflow/LongLivedProcess.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Execute a long-lived process.
@@ -82,6 +83,7 @@ public class LongLivedProcess implements Runnable {
private final List<String> recentLines = new LinkedList<String>();
private int recentLineLimit = RECENT_LINE_LOG_LIMIT;
private LongLivedProcessLifecycleEvent lifecycleCallback;
+ private final AtomicBoolean finalOutputProcessed = new AtomicBoolean(false);
/**
@@ -347,6 +349,14 @@ public class LongLivedProcess implements Runnable {
}
/**
+ * Query to see if the final output has been processed
+ * @return
+ */
+ public boolean isFinalOutputProcessed() {
+ return finalOutputProcessed.get();
+ }
+
+ /**
* add the recent line to the list of recent lines; deleting
* an earlier on if the limit is reached.
*
@@ -499,6 +509,9 @@ public class LongLivedProcess implements Runnable {
LOG.warn("encountered {}", ignored, ignored);
//process connection has been torn down
} finally {
+ //mark output as done
+ finalOutputProcessed.set(true);
+ // close streams
IOUtils.closeStream(errReader);
IOUtils.closeStream(outReader);
}
http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/537d87c2/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
----------------------------------------------------------------------
diff --git a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
index d46f07c..6d08156 100644
--- a/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
+++ b/slider-core/src/test/java/org/apache/slider/server/services/workflow/TestWorkflowForkedProcessService.java
@@ -43,6 +43,7 @@ public class TestWorkflowForkedProcessService extends WorkflowServiceTestBase {
private static final Logger
processLog =
LoggerFactory.getLogger("org.apache.hadoop.services.workflow.Process");
+ public static final int RECENT_OUTPUT_SLEEP_DURATION = 4000;
private ForkedProcessService process;
@@ -118,7 +119,7 @@ public class TestWorkflowForkedProcessService extends WorkflowServiceTestBase {
* @return the last output
*/
private List<String> getFinalOutput() {
- return process.getRecentOutput(2000);
+ return process.getRecentOutput(true, RECENT_OUTPUT_SLEEP_DURATION);
}
private ForkedProcessService initProcess(List<String> commands) throws