You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/03/23 23:08:26 UTC
[1/2] incubator-beam git commit: Schedule roots less aggressively
Repository: incubator-beam
Updated Branches:
refs/heads/master 619db54db -> 9247ad78d
Schedule roots less aggressively
The excess scheduling of known-empty bundles can consume excessive
resources.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35df1668
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35df1668
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35df1668
Branch: refs/heads/master
Commit: 35df16686940b7dae7bcfb2a42a104bc8ea79f11
Parents: 619db54
Author: Thomas Groh <tg...@google.com>
Authored: Mon Mar 21 17:00:35 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 23 14:56:09 2016 -0700
----------------------------------------------------------------------
.../ExecutorServiceParallelExecutor.java | 76 +++++++++----
.../inprocess/InMemoryWatermarkManager.java | 16 ---
.../inprocess/InProcessEvaluationContext.java | 46 ++++++--
.../InProcessEvaluationContextTest.java | 110 ++++++++++++++++++-
4 files changed, 200 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index c72a115..68a1b8c 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -304,8 +304,8 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
visibleUpdates.offer(VisibleExecutorUpdate.fromThrowable(update.getException().get()));
}
}
- fireTimers();
- mightNeedMoreWork();
+ boolean timersFired = fireTimers();
+ addWorkIfNecessary(timersFired);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Monitor died due to being interrupted");
@@ -326,8 +326,12 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
}
}
- private void fireTimers() throws Exception {
+ /**
+ * Fires any available timers. Returns true if at least one timer was fired.
+ */
+ private boolean fireTimers() throws Exception {
try {
+ boolean firedTimers = false;
for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
evaluationContext.extractFiredTimers().entrySet()) {
AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
@@ -346,9 +350,11 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
.add(WindowedValue.valueInEmptyWindows(work))
.commit(Instant.now());
scheduleConsumption(transform, bundle, new TimerCompletionCallback(delivery));
+ firedTimers = true;
}
}
}
+ return firedTimers;
} catch (Exception e) {
LOG.error("Internal Error while delivering timers", e);
throw e;
@@ -367,26 +373,58 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
return false;
}
- private void mightNeedMoreWork() {
- synchronized (scheduledExecutors) {
- for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
- Thread thread = executor.getThread();
- if (thread != null) {
- switch (thread.getState()) {
- case BLOCKED:
- case WAITING:
- case TERMINATED:
- case TIMED_WAITING:
- break;
- default:
- return;
- }
- }
+ /**
+ * If all active {@link TransformExecutor TransformExecutors} are in a blocked state,
+ * add more work from root nodes that may have additional work. This ensures that if a pipeline
+ * has elements available from the root nodes it will add those elements when necessary.
+ */
+ private void addWorkIfNecessary(boolean firedTimers) {
+ // If any timers have fired, they will add more work; We don't need to add more
+ if (firedTimers) {
+ return;
+ }
+ for (TransformExecutor<?> executor : scheduledExecutors.keySet()) {
+ if (!isExecutorBlocked(executor)) {
+ // We have at least one executor that can proceed without adding additional work
+ return;
}
}
// All current TransformExecutors are blocked; add more work from the roots.
for (AppliedPTransform<?, ?, ?> root : rootNodes) {
- scheduleConsumption(root, null, defaultCompletionCallback);
+ if (!evaluationContext.isDone(root)) {
+ scheduleConsumption(root, null, defaultCompletionCallback);
+ }
+ }
+ }
+
+ /**
+ * Return true if the provided executor might make more progress if no action is taken.
+ *
+ * <p>May return false even if all executor threads are currently blocked or cleaning up, as
+ * these can cause more work to be scheduled. If this does not occur, after these calls
+ * terminate, future calls will return true if all executors are waiting.
+ */
+ private boolean isExecutorBlocked(TransformExecutor<?> executor) {
+ Thread thread = executor.getThread();
+ if (thread == null) {
+ return false;
+ }
+ switch (thread.getState()) {
+ case TERMINATED:
+ throw new IllegalStateException(String.format(
+ "Unexpectedly encountered a Terminated TransformExecutor %s", executor));
+ case WAITING:
+ case TIMED_WAITING:
+ // The thread is waiting for some external input. Adding more work may cause the thread
+ // to stop waiting (e.g. the thread is waiting on an unbounded side input)
+ return true;
+ case BLOCKED:
+ // The executor is blocked on acquisition of a java monitor. This usually means it is
+ // making a call to the EvaluationContext, but not a model-blocking call - and will
+ // eventually complete, at which point we may reevaluate.
+ default:
+ // NEW and RUNNABLE threads can make progress
+ return false;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
index 094526d..a9a62a6 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InMemoryWatermarkManager.java
@@ -862,22 +862,6 @@ public class InMemoryWatermarkManager {
}
/**
- * Returns true if, for any {@link TransformWatermarks} returned by
- * {@link #getWatermarks(AppliedPTransform)}, the output watermark will be equal to
- * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
- */
- public boolean allWatermarksAtPositiveInfinity() {
- for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
- transformToWatermarks.entrySet()) {
- Instant endOfTime = THE_END_OF_TIME.get();
- if (watermarksEntry.getValue().getOutputWatermark().isBefore(endOfTime)) {
- return false;
- }
- }
- return true;
- }
-
- /**
* A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
* and as such the watermark manager must track holds and the release of holds on a per-key basis.
*
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
index 2908fba..4aeb0d3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContext.java
@@ -357,27 +357,49 @@ class InProcessEvaluationContext {
}
/**
- * Returns true if all steps are done.
+ * Returns true if the step will not produce additional output.
+ *
+ * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
+ * {@link PCollection PCollections}, returns true if the watermark is at
+ * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
+ *
+ * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
+ * {@link PCollection PCollections}, returns the value of
+ * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
*/
- public boolean isDone() {
- if (!options.isShutdownUnboundedProducersWithMaxWatermark() && containsUnboundedPCollection()) {
+ public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
+ // if the PTransform's watermark isn't at the max value, it isn't done
+ if (watermarkManager
+ .getWatermarks(transform)
+ .getOutputWatermark()
+ .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
return false;
}
- if (!watermarkManager.allWatermarksAtPositiveInfinity()) {
- return false;
+ // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
+ // the PTransform may produce additional output. It is not done.
+ for (PValue output : transform.getOutput().expand()) {
+ if (output instanceof PCollection) {
+ IsBounded bounded = ((PCollection<?>) output).isBounded();
+ if (bounded.equals(IsBounded.UNBOUNDED)
+ && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
+ return false;
+ }
+ }
}
+ // The PTransform's watermark was at positive infinity and all of its outputs are known to be
+ // done. It is done.
return true;
}
- private boolean containsUnboundedPCollection() {
+ /**
+ * Returns true if all steps are done.
+ */
+ public boolean isDone() {
for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
- for (PValue value : transform.getInput().expand()) {
- if (value instanceof PCollection
- && ((PCollection<?>) value).isBounded().equals(IsBounded.UNBOUNDED)) {
- return true;
- }
+ if (!isDone(transform)) {
+ return false;
}
}
- return false;
+ return true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35df1668/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index 1490960..1a0f505 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -24,12 +24,14 @@ import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
+import com.google.cloud.dataflow.sdk.io.CountingInput;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.FiredTimers;
import com.google.cloud.dataflow.sdk.runners.inprocess.InMemoryWatermarkManager.TimerUpdate;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.PCollectionViewWriter;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
import com.google.cloud.dataflow.sdk.transforms.Create;
@@ -58,6 +60,7 @@ import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
@@ -80,25 +83,33 @@ import java.util.concurrent.TimeUnit;
public class InProcessEvaluationContextTest {
private TestPipeline p;
private InProcessEvaluationContext context;
+
private PCollection<Integer> created;
private PCollection<KV<String, Integer>> downstream;
private PCollectionView<Iterable<Integer>> view;
+ private PCollection<Long> unbounded;
+
@Before
public void setup() {
InProcessPipelineRunner runner =
InProcessPipelineRunner.fromOptions(PipelineOptionsFactory.create());
+
p = TestPipeline.create();
+
created = p.apply(Create.of(1, 2, 3));
downstream = created.apply(WithKeys.<String, Integer>of("foo"));
view = created.apply(View.<Integer>asIterable());
+ unbounded = p.apply(CountingInput.unbounded());
Collection<AppliedPTransform<?, ?, ?>> rootTransforms =
- ImmutableList.<AppliedPTransform<?, ?, ?>>of(created.getProducingTransformInternal());
+ ImmutableList.<AppliedPTransform<?, ?, ?>>of(
+ created.getProducingTransformInternal(), unbounded.getProducingTransformInternal());
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers = new HashMap<>();
valueToConsumers.put(
created,
ImmutableList.<AppliedPTransform<?, ?, ?>>of(
downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
+ valueToConsumers.put(unbounded, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
valueToConsumers.put(downstream, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
valueToConsumers.put(view, ImmutableList.<AppliedPTransform<?, ?, ?>>of());
@@ -106,6 +117,7 @@ public class InProcessEvaluationContextTest {
stepNames.put(created.getProducingTransformInternal(), "s1");
stepNames.put(downstream.getProducingTransformInternal(), "s2");
stepNames.put(view.getProducingTransformInternal(), "s3");
+ stepNames.put(unbounded.getProducingTransformInternal(), "s4");
Collection<PCollectionView<?>> views = ImmutableList.<PCollectionView<?>>of(view);
context = InProcessEvaluationContext.create(
@@ -421,6 +433,102 @@ public class InProcessEvaluationContextTest {
assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
}
+ @Test
+ public void isDoneWithUnboundedPCollectionAndShutdown() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(true));
+ }
+
+ @Test
+ public void isDoneWithUnboundedPCollectionAndNotShutdown() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ assertThat(context.isDone(unbounded.getProducingTransformInternal()), is(false));
+ }
+
+ @Test
+ public void isDoneWithOnlyBoundedPCollections() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+ assertThat(context.isDone(created.getProducingTransformInternal()), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+ assertThat(context.isDone(created.getProducingTransformInternal()), is(true));
+ }
+
+ @Test
+ public void isDoneWithPartiallyDone() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(true);
+ assertThat(context.isDone(), is(false));
+
+ UncommittedBundle<Integer> rootBundle = context.createRootBundle(created);
+ rootBundle.add(WindowedValue.valueInGlobalWindow(1));
+ Iterable<? extends CommittedBundle<?>> handleResult =
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal())
+ .addOutput(rootBundle)
+ .build());
+ @SuppressWarnings("unchecked")
+ CommittedBundle<Integer> committedBundle =
+ (CommittedBundle<Integer>) Iterables.getOnlyElement(handleResult);
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ context.handleResult(
+ committedBundle,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(false));
+
+ context.handleResult(
+ committedBundle,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(true));
+ }
+
+ @Test
+ public void isDoneWithUnboundedAndNotShutdown() {
+ context.getPipelineOptions().setShutdownUnboundedProducersWithMaxWatermark(false);
+ assertThat(context.isDone(), is(false));
+
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(created.getProducingTransformInternal()).build());
+ context.handleResult(
+ null,
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
+ context.handleResult(
+ context.createRootBundle(created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(false));
+
+ context.handleResult(
+ context.createRootBundle(created).commit(Instant.now()),
+ ImmutableList.<TimerData>of(),
+ StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+ assertThat(context.isDone(), is(false));
+ }
+
private static class TestBoundedWindow extends BoundedWindow {
private final Instant ts;
[2/2] incubator-beam git commit: This closes #64
Posted by bc...@apache.org.
This closes #64
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9247ad78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9247ad78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9247ad78
Branch: refs/heads/master
Commit: 9247ad78dc1b63605b52c34f67cf292ac8dedb98
Parents: 619db54 35df166
Author: bchambers <bc...@google.com>
Authored: Wed Mar 23 14:56:32 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Mar 23 14:56:32 2016 -0700
----------------------------------------------------------------------
.../ExecutorServiceParallelExecutor.java | 76 +++++++++----
.../inprocess/InMemoryWatermarkManager.java | 16 ---
.../inprocess/InProcessEvaluationContext.java | 46 ++++++--
.../InProcessEvaluationContextTest.java | 110 ++++++++++++++++++-
4 files changed, 200 insertions(+), 48 deletions(-)
----------------------------------------------------------------------