You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/04/08 23:26:52 UTC
[2/3] incubator-beam git commit: Schedule all pending work before
firing timers
Schedule all pending work before firing timers
Pull all available work off of the ExecutorUpdate queue during each
execution of the MonitorRunnable.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad58e26d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad58e26d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad58e26d
Branch: refs/heads/master
Commit: ad58e26db132268c676ca9289b73e1ddab72627a
Parents: 529bcdf
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 29 10:27:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Apr 8 14:20:20 2016 -0700
----------------------------------------------------------------------
.../sdk/runners/inprocess/ExecutorServiceParallelExecutor.java | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ad58e26d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 9af6f97..4d45e8f 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -325,13 +325,15 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
Thread.currentThread().setName(runnableName);
try {
ExecutorUpdate update = allUpdates.poll();
- if (update != null) {
+ // pull all of the pending work off of the queue
+ while (update != null) {
LOG.debug("Executor Update: {}", update);
if (update.getBundle().isPresent()) {
scheduleConsumers(update.getBundle().get());
} else if (update.getException().isPresent()) {
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
}
+ update = allUpdates.poll();
}
boolean timersFired = fireTimers();
addWorkIfNecessary(timersFired);