You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/31 17:38:57 UTC
[1/2] beam git commit: This closes #3269
Repository: beam
Updated Branches:
refs/heads/master dc70383cb -> 19c33dfa6
This closes #3269
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/19c33dfa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/19c33dfa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/19c33dfa
Branch: refs/heads/master
Commit: 19c33dfa6b8a64a102192c7ff47acc27a0db548a
Parents: dc70383 6a78bd3
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 31 10:38:45 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 31 10:38:45 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/DirectGraph.java | 26 +++++++++++---------
.../beam/runners/direct/DirectGraphVisitor.java | 25 ++++++++++---------
.../beam/runners/direct/DirectRunner.java | 4 ++-
.../beam/runners/direct/EvaluationContext.java | 17 ++++++++++++-
.../beam/runners/direct/WatermarkManager.java | 19 +++++++++++---
.../runners/direct/DirectGraphVisitorTest.java | 3 +++
.../beam/runners/direct/DirectGraphs.java | 10 +++++++-
.../runners/direct/EvaluationContextTest.java | 6 ++++-
8 files changed, 80 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Reduce Prevalence of PValue in the DirectRunner
Posted by tg...@apache.org.
Reduce Prevalence of PValue in the DirectRunner
Use PCollection or PCollectionView explicitly.
Retrieve views from the WriteView transform rather than visiting the
view as an output PValue.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6a78bd3f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6a78bd3f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6a78bd3f
Branch: refs/heads/master
Commit: 6a78bd3f09c99f49c5f27d15b3791f200bf5d53d
Parents: dc70383
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 31 09:38:02 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 31 10:38:45 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/direct/DirectGraph.java | 26 +++++++++++---------
.../beam/runners/direct/DirectGraphVisitor.java | 25 ++++++++++---------
.../beam/runners/direct/DirectRunner.java | 4 ++-
.../beam/runners/direct/EvaluationContext.java | 17 ++++++++++++-
.../beam/runners/direct/WatermarkManager.java | 19 +++++++++++---
.../runners/direct/DirectGraphVisitorTest.java | 3 +++
.../beam/runners/direct/DirectGraphs.java | 10 +++++++-
.../runners/direct/EvaluationContextTest.java | 6 ++++-
8 files changed, 80 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
index 83b214a..c2c0afa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraph.java
@@ -24,9 +24,9 @@ import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
/**
@@ -34,39 +34,43 @@ import org.apache.beam.sdk.values.PValue;
* executed with the {@link DirectRunner}.
*/
class DirectGraph {
- private final Map<POutput, AppliedPTransform<?, ?, ?>> producers;
+ private final Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers;
+ private final Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters;
private final ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers;
- private final Set<PCollectionView<?>> views;
private final Set<AppliedPTransform<?, ?, ?>> rootTransforms;
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
public static DirectGraph create(
- Map<POutput, AppliedPTransform<?, ?, ?>> producers,
+ Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
+ Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
- Set<PCollectionView<?>> views,
Set<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
- return new DirectGraph(producers, primitiveConsumers, views, rootTransforms, stepNames);
+ return new DirectGraph(producers, viewWriters, primitiveConsumers, rootTransforms, stepNames);
}
private DirectGraph(
- Map<POutput, AppliedPTransform<?, ?, ?>> producers,
+ Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers,
+ Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters,
ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers,
- Set<PCollectionView<?>> views,
Set<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<AppliedPTransform<?, ?, ?>, String> stepNames) {
this.producers = producers;
+ this.viewWriters = viewWriters;
this.primitiveConsumers = primitiveConsumers;
- this.views = views;
this.rootTransforms = rootTransforms;
this.stepNames = stepNames;
}
- AppliedPTransform<?, ?, ?> getProducer(PValue produced) {
+ AppliedPTransform<?, ?, ?> getProducer(PCollection<?> produced) {
return producers.get(produced);
}
+ AppliedPTransform<?, ?, ?> getWriter(PCollectionView<?> view) {
+ return viewWriters.get(view);
+ }
+
List<AppliedPTransform<?, ?, ?>> getPrimitiveConsumers(PValue consumed) {
return primitiveConsumers.get(consumed);
}
@@ -76,7 +80,7 @@ class DirectGraph {
}
Set<PCollectionView<?>> getViews() {
- return views;
+ return viewWriters.keySet();
}
String getStepName(AppliedPTransform<?, ?, ?> step) {
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 01204e3..d54de5d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
/**
@@ -42,12 +42,12 @@ import org.apache.beam.sdk.values.PValue;
*/
class DirectGraphVisitor extends PipelineVisitor.Defaults {
- private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
+ private Map<PCollection<?>, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
+ private Map<PCollectionView<?>, AppliedPTransform<?, ?, ?>> viewWriters = new HashMap<>();
private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
ArrayListMultimap.create();
- private Set<PCollectionView<?>> views = new HashSet<>();
private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
private int numTransforms = 0;
@@ -86,17 +86,19 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
for (PValue value : node.getInputs().values()) {
primitiveConsumers.put(value, appliedTransform);
}
+ if (node.getTransform() instanceof ViewOverrideFactory.WriteView) {
+ viewWriters.put(
+ ((ViewOverrideFactory.WriteView<?, ?>) node.getTransform()).getView(),
+ node.toAppliedPTransform(getPipeline()));
+ }
}
}
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
AppliedPTransform<?, ?, ?> appliedTransform = getAppliedTransform(producer);
- if (value instanceof PCollectionView) {
- views.add((PCollectionView<?>) value);
- }
- if (!producers.containsKey(value)) {
- producers.put(value, appliedTransform);
+ if (value instanceof PCollection && !producers.containsKey(value)) {
+ producers.put((PCollection<?>) value, appliedTransform);
}
}
@@ -111,11 +113,12 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
}
/**
- * Get the graph constructed by this {@link DirectGraphVisitor}, which provides
- * lookups for producers and consumers of {@link PValue PValues}.
+ * Get the graph constructed by this {@link DirectGraphVisitor}, which provides lookups for
+ * producers and consumers of {@link PValue PValues}.
*/
public DirectGraph getGraph() {
checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
- return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames);
+ return DirectGraph.create(
+ producers, viewWriters, primitiveConsumers, rootTransforms, stepNames);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 69dea8f..dbd1ec4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.direct;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -220,7 +221,8 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
* iteration order based on the order at which elements are added to it.
*/
@SuppressWarnings("rawtypes")
- private List<PTransformOverride> defaultTransformOverrides() {
+ @VisibleForTesting
+ List<PTransformOverride> defaultTransformOverrides() {
return ImmutableList.<PTransformOverride>builder()
.add(
PTransformOverride.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 88ce85a..e215070 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -276,7 +276,7 @@ class EvaluationContext {
* callback will be executed regardless of whether values have been produced.
*/
public void scheduleAfterOutputWouldBeProduced(
- PValue value,
+ PCollection<?> value,
BoundedWindow window,
WindowingStrategy<?, ?> windowingStrategy,
Runnable runnable) {
@@ -287,6 +287,21 @@ class EvaluationContext {
}
/**
+ * Schedule a callback to be executed after output would be produced for the given window if there
+ * had been input.
+ */
+ public void scheduleAfterOutputWouldBeProduced(
+ PCollectionView<?> view,
+ BoundedWindow window,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Runnable runnable) {
+ AppliedPTransform<?, ?, ?> producing = graph.getWriter(view);
+ callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
+
+ fireAvailableCallbacks(producing);
+ }
+
+ /**
* Schedule a callback to be executed after the given window is expired.
*
* <p>For example, upstream state associated with the window may be cleared.
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 4f1b831..40ce163 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -60,6 +60,7 @@ import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Instant;
@@ -790,6 +791,18 @@ class WatermarkManager {
}
}
+ private TransformWatermarks getValueWatermark(PValue pvalue) {
+ if (pvalue instanceof PCollection) {
+ return getTransformWatermark(graph.getProducer((PCollection<?>) pvalue));
+ } else if (pvalue instanceof PCollectionView<?>) {
+ return getTransformWatermark(graph.getWriter((PCollectionView<?>) pvalue));
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown type of %s %s", PValue.class.getSimpleName(), pvalue.getClass()));
+ }
+ }
+
private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
TransformWatermarks wms = transformToWatermarks.get(transform);
if (wms == null) {
@@ -824,8 +837,7 @@ class WatermarkManager {
}
for (PValue pvalue : inputs.values()) {
Watermark producerOutputWatermark =
- getTransformWatermark(graph.getProducer(pvalue))
- .synchronizedProcessingOutputWatermark;
+ getValueWatermark(pvalue).synchronizedProcessingOutputWatermark;
inputWmsBuilder.add(producerOutputWatermark);
}
return inputWmsBuilder.build();
@@ -838,8 +850,7 @@ class WatermarkManager {
inputWatermarksBuilder.add(THE_END_OF_TIME);
}
for (PValue pvalue : inputs.values()) {
- Watermark producerOutputWatermark =
- getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
+ Watermark producerOutputWatermark = getValueWatermark(pvalue).outputWatermark;
inputWatermarksBuilder.add(producerOutputWatermark);
}
List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 7f46a0e..576edf3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -78,6 +78,9 @@ public class DirectGraphVisitorTest implements Serializable {
.apply(View.<String>asList());
PCollectionView<Object> singletonView =
p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
+ p.replaceAll(
+ DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
+ .defaultTransformOverrides());
p.traverseTopologically(visitor);
assertThat(
visitor.getGraph().getViews(),
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
index 2f048fa..43de091 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphs.java
@@ -19,6 +19,8 @@ package org.apache.beam.runners.direct;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
/** Test utilities for the {@link DirectRunner}. */
@@ -30,6 +32,12 @@ final class DirectGraphs {
}
public static AppliedPTransform<?, ?, ?> getProducer(PValue value) {
- return getGraph(value.getPipeline()).getProducer(value);
+ if (value instanceof PCollection) {
+ return getGraph(value.getPipeline()).getProducer((PCollection<?>) value);
+ } else if (value instanceof PCollectionView) {
+ return getGraph(value.getPipeline()).getWriter((PCollectionView<?>) value);
+ }
+ throw new IllegalArgumentException(
+ String.format("Unexpected type of %s %s", PValue.class.getSimpleName(), value.getClass()));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/6a78bd3f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 80b04f8..c0e43d6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -101,6 +101,10 @@ public class EvaluationContextTest {
view = created.apply(View.<Integer>asIterable());
unbounded = p.apply(GenerateSequence.from(0));
+ p.replaceAll(
+ DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
+ .defaultTransformOverrides());
+
KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
p.traverseTopologically(keyedPValueTrackingVisitor);
@@ -116,7 +120,7 @@ public class EvaluationContextTest {
createdProducer = graph.getProducer(created);
downstreamProducer = graph.getProducer(downstream);
- viewProducer = graph.getProducer(view);
+ viewProducer = graph.getWriter(view);
unboundedProducer = graph.getProducer(unbounded);
}