You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/06/14 17:21:13 UTC
[1/2] beam git commit: This closes #3353
Repository: beam
Updated Branches:
refs/heads/master eebff9089 -> dd9abc397
This closes #3353
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dd9abc39
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dd9abc39
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dd9abc39
Branch: refs/heads/master
Commit: dd9abc397c67b47f95414bfd5debb08b7b6a6eaf
Parents: eebff90 8a850af
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 14 10:21:02 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 14 10:21:02 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CommittedResult.java | 12 ++++-----
.../beam/runners/direct/EvaluationContext.java | 26 +++++++++++++++-----
.../direct/ExecutorServiceParallelExecutor.java | 9 ++++---
.../beam/runners/direct/WatermarkManager.java | 4 +--
.../runners/direct/CommittedResultTest.java | 17 +++++++------
.../runners/direct/TransformExecutorTest.java | 11 +++++++--
.../runners/direct/WatermarkManagerTest.java | 15 ++++++++---
7 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Do not produce Unprocessed Inputs if all
inputs were Processed
Posted by tg...@apache.org.
Do not produce Unprocessed Inputs if all inputs were Processed
This stops the WatermarkManager "Pending Bundles" from growing without
bound.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8a850af3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8a850af3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8a850af3
Branch: refs/heads/master
Commit: 8a850af3304a48618739dc23e286800dc0c4641a
Parents: eebff90
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 13 12:50:58 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Jun 14 10:21:02 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/CommittedResult.java | 12 ++++-----
.../beam/runners/direct/EvaluationContext.java | 26 +++++++++++++++-----
.../direct/ExecutorServiceParallelExecutor.java | 9 ++++---
.../beam/runners/direct/WatermarkManager.java | 4 +--
.../runners/direct/CommittedResultTest.java | 17 +++++++------
.../runners/direct/TransformExecutorTest.java | 11 +++++++--
.../runners/direct/WatermarkManagerTest.java | 15 ++++++++---
7 files changed, 62 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 8c45449..70e3ac3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -19,8 +19,8 @@
package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
+import com.google.common.base.Optional;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
@@ -36,12 +36,10 @@ abstract class CommittedResult {
/**
* Returns the {@link CommittedBundle} that contains the input elements that could not be
- * processed by the evaluation.
- *
- * <p>{@code null} if the input bundle was null.
+ * processed by the evaluation. The returned optional is present if there were any unprocessed
+ * input elements, and absent otherwise.
*/
- @Nullable
- public abstract CommittedBundle<?> getUnprocessedInputs();
+ public abstract Optional<? extends CommittedBundle<?>> getUnprocessedInputs();
/**
* Returns the outputs produced by the transform.
@@ -59,7 +57,7 @@ abstract class CommittedResult {
public static CommittedResult create(
TransformResult<?> original,
- CommittedBundle<?> unprocessedElements,
+ Optional<? extends CommittedBundle<?>> unprocessedElements,
Iterable<? extends CommittedBundle<?>> outputs,
Set<OutputType> producedOutputs) {
return new AutoValue_CommittedResult(original.getTransform(),
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index e215070..d192785 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
@@ -158,12 +159,9 @@ class EvaluationContext {
} else {
outputTypes.add(OutputType.BUNDLE);
}
- CommittedResult committedResult = CommittedResult.create(result,
- completedBundle == null
- ? null
- : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
- committedBundles,
- outputTypes);
+ CommittedResult committedResult =
+ CommittedResult.create(
+ result, getUnprocessedInput(completedBundle, result), committedBundles, outputTypes);
// Update state internals
CopyOnAccessInMemoryStateInternals theirState = result.getState();
if (theirState != null) {
@@ -187,6 +185,22 @@ class EvaluationContext {
return committedResult;
}
+ /**
+ * Returns an {@link Optional} containing a bundle which contains all of the unprocessed elements
+ * that were not processed from the {@code completedBundle}. If all of the elements of the {@code
+ * completedBundle} were processed, or if {@code completedBundle} is null, returns an absent
+ * {@link Optional}.
+ */
+ private Optional<? extends CommittedBundle<?>> getUnprocessedInput(
+ @Nullable CommittedBundle<?> completedBundle, TransformResult<?> result) {
+ if (completedBundle == null || Iterables.isEmpty(result.getUnprocessedElements())) {
+ return Optional.absent();
+ }
+ CommittedBundle<?> residual =
+ completedBundle.withElements((Iterable) result.getUnprocessedElements());
+ return Optional.of(residual);
+ }
+
private Iterable<? extends CommittedBundle<?>> commitBundles(
Iterable<? extends UncommittedBundle<?>> bundles) {
ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 6fe8ebd..2f4d1f6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -357,15 +357,16 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor {
ExecutorUpdate.fromBundle(
outputBundle, graph.getPerElementConsumers(outputBundle.getPCollection())));
}
- CommittedBundle<?> unprocessedInputs = committedResult.getUnprocessedInputs();
- if (unprocessedInputs != null && !Iterables.isEmpty(unprocessedInputs.getElements())) {
+ Optional<? extends CommittedBundle<?>> unprocessedInputs =
+ committedResult.getUnprocessedInputs();
+ if (unprocessedInputs.isPresent()) {
if (inputBundle.getPCollection() == null) {
// TODO: Split this logic out of an if statement
- pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs);
+ pendingRootBundles.get(result.getTransform()).offer(unprocessedInputs.get());
} else {
allUpdates.offer(
ExecutorUpdate.fromBundle(
- unprocessedInputs,
+ unprocessedInputs.get(),
Collections.<AppliedPTransform<?, ?, ?>>singleton(
committedResult.getTransform())));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 80a3504..599b74f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -994,9 +994,9 @@ class WatermarkManager {
}
TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
- if (input != null) {
+ if (result.getUnprocessedInputs().isPresent()) {
// Add the unprocessed inputs
- completedTransform.addPending(result.getUnprocessedInputs());
+ completedTransform.addPending(result.getUnprocessedInputs().get());
}
completedTransform.updateTimers(timerUpdate);
if (input != null) {
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
index cf19dc2..8b95b34 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/CommittedResultTest.java
@@ -18,9 +18,9 @@
package org.apache.beam.runners.direct;
-import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import java.io.Serializable;
import java.util.Collections;
@@ -72,7 +72,7 @@ public class CommittedResultTest implements Serializable {
CommittedResult result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
- bundleFactory.createBundle(created).commit(Instant.now()),
+ Optional.<CommittedBundle<?>>absent(),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.noneOf(OutputType.class));
@@ -88,11 +88,11 @@ public class CommittedResultTest implements Serializable {
CommittedResult result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
- bundle,
+ Optional.of(bundle),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.noneOf(OutputType.class));
- assertThat(result.getUnprocessedInputs(),
+ assertThat(result.getUnprocessedInputs().get(),
Matchers.<CommittedBundle<?>>equalTo(bundle));
}
@@ -101,11 +101,14 @@ public class CommittedResultTest implements Serializable {
CommittedResult result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
- null,
+ Optional.<CommittedBundle<?>>absent(),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.noneOf(OutputType.class));
- assertThat(result.getUnprocessedInputs(), nullValue());
+ assertThat(
+ result.getUnprocessedInputs(),
+ Matchers.<Optional<? extends CommittedBundle<?>>>equalTo(
+ Optional.<CommittedBundle<?>>absent()));
}
@Test
@@ -120,7 +123,7 @@ public class CommittedResultTest implements Serializable {
CommittedResult result =
CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
- bundleFactory.createBundle(created).commit(Instant.now()),
+ Optional.<CommittedBundle<?>>absent(),
outputs,
EnumSet.of(OutputType.BUNDLE, OutputType.PCOLLECTION_VIEW));
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 3dd4028..b7f5a7c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -25,6 +25,8 @@ import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.when;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.ArrayList;
import java.util.Collection;
@@ -415,8 +417,13 @@ public class TransformExecutorTest {
? Collections.emptyList()
: result.getUnprocessedElements();
- CommittedBundle<?> unprocessedBundle =
- inputBundle == null ? null : inputBundle.withElements(unprocessedElements);
+ Optional<? extends CommittedBundle<?>> unprocessedBundle;
+ if (inputBundle == null || Iterables.isEmpty(unprocessedElements)) {
+ unprocessedBundle = Optional.absent();
+ } else {
+ unprocessedBundle =
+ Optional.<CommittedBundle<?>>of(inputBundle.withElements(unprocessedElements));
+ }
return CommittedResult.create(
result,
unprocessedBundle,
http://git-wip-us.apache.org/repos/asf/beam/blob/8a850af3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
index e0b5251..e3f6215 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WatermarkManagerTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
+import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
@@ -318,7 +319,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.empty(),
CommittedResult.create(
StepTransformResult.withoutHold(graph.getProducer(created)).build(),
- root.withElements(Collections.<WindowedValue<Void>>emptyList()),
+ Optional.<CommittedBundle<?>>absent(),
Collections.singleton(createBundle),
EnumSet.allOf(OutputType.class)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -332,7 +333,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.empty(),
CommittedResult.create(
StepTransformResult.withoutHold(theFlatten).build(),
- createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+ Optional.<CommittedBundle<?>>absent(),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.allOf(OutputType.class)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -345,7 +346,7 @@ public class WatermarkManagerTest implements Serializable {
TimerUpdate.empty(),
CommittedResult.create(
StepTransformResult.withoutHold(theFlatten).build(),
- createBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
+ Optional.<CommittedBundle<?>>absent(),
Collections.<CommittedBundle<?>>emptyList(),
EnumSet.allOf(OutputType.class)),
BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1501,9 +1502,15 @@ public class WatermarkManagerTest implements Serializable {
AppliedPTransform<?, ?, ?> transform,
@Nullable CommittedBundle<?> unprocessedBundle,
Iterable<? extends CommittedBundle<?>> bundles) {
+ Optional<? extends CommittedBundle<?>> unprocessedElements;
+ if (unprocessedBundle == null || Iterables.isEmpty(unprocessedBundle.getElements())) {
+ unprocessedElements = Optional.absent();
+ } else {
+ unprocessedElements = Optional.of(unprocessedBundle);
+ }
return CommittedResult.create(
StepTransformResult.withoutHold(transform).build(),
- unprocessedBundle,
+ unprocessedElements,
bundles,
Iterables.isEmpty(bundles)
? EnumSet.noneOf(OutputType.class)