You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/19 17:53:40 UTC
[1/2] beam git commit: This closes #2587
Repository: beam
Updated Branches:
refs/heads/master 714fdd292 -> 29e054a8d
This closes #2587
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29e054a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29e054a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29e054a8
Branch: refs/heads/master
Commit: 29e054a8d7ffe6a061dbbe9a1885185b02f4e8ec
Parents: 714fdd2 418c304
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 19 10:53:30 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 19 10:53:30 2017 -0700
----------------------------------------------------------------------
.../core/construction/UnconsumedReads.java | 72 +++++++++++++
.../core/construction/UnconsumedReadsTest.java | 105 +++++++++++++++++++
.../beam/runners/dataflow/DataflowRunner.java | 4 +
.../runners/dataflow/DataflowRunnerTest.java | 24 +++++
4 files changed, 205 insertions(+)
----------------------------------------------------------------------
[2/2] beam git commit: Ensure all Read outputs are consumed in
Dataflow
Posted by tg...@apache.org.
Ensure all Read outputs are consumed in Dataflow
Apply a no-op ParDo to any PTransform that is not consumed.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/418c304d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/418c304d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/418c304d
Branch: refs/heads/master
Commit: 418c304dbff1ce8c176d08c890780ec97245aaae
Parents: 714fdd2
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 18 17:25:59 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 19 10:53:30 2017 -0700
----------------------------------------------------------------------
.../core/construction/UnconsumedReads.java | 72 +++++++++++++
.../core/construction/UnconsumedReadsTest.java | 105 +++++++++++++++++++
.../beam/runners/dataflow/DataflowRunner.java | 4 +
.../runners/dataflow/DataflowRunnerTest.java | 24 +++++
4 files changed, 205 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
new file mode 100644
index 0000000..c191eeb
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Utilities for ensuring that all {@link Read} {@link PTransform PTransforms} are consumed by some
+ * {@link PTransform}.
+ */
+public class UnconsumedReads {
+ public static void ensureAllReadsConsumed(Pipeline pipeline) {
+ final Set<PCollection<?>> unconsumed = new HashSet<>();
+ pipeline.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ unconsumed.removeAll(node.getInputs().values());
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (producer.getTransform() instanceof Read.Bounded
+ || producer.getTransform() instanceof Read.Unbounded) {
+ unconsumed.add((PCollection<?>) value);
+ }
+ }
+ });
+ int i = 0;
+ for (PCollection<?> unconsumedPCollection : unconsumed) {
+ consume(unconsumedPCollection, i);
+ i++;
+ }
+ }
+
+ private static <T> void consume(PCollection<T> unconsumedPCollection, int uniq) {
+ // Multiple applications should never break due to stable unique names.
+ String uniqueName = "DropInputs" + (uniq == 0 ? "" : uniq);
+ unconsumedPCollection.apply(uniqueName, ParDo.of(new NoOpDoFn<T>()));
+ }
+
+ private static class NoOpDoFn<T> extends DoFn<T, T> {
+ @ProcessElement
+ public void doNothing(ProcessContext context) {}
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java
new file mode 100644
index 0000000..1966a93
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link UnconsumedReads}.
+ */
+@RunWith(JUnit4.class)
+public class UnconsumedReadsTest {
+ @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Test
+ public void matcherProducesUnconsumedValueBoundedRead() {
+ Bounded<Long> transform = Read.from(CountingSource.upTo(20L));
+ PCollection<Long> output = pipeline.apply(transform);
+ UnconsumedReads.ensureAllReadsConsumed(pipeline);
+ validateConsumed();
+ }
+
+ @Test
+ public void matcherProducesUnconsumedValueUnboundedRead() {
+ Unbounded<Long> transform = Read.from(CountingSource.unbounded());
+ PCollection<Long> output = pipeline.apply(transform);
+ UnconsumedReads.ensureAllReadsConsumed(pipeline);
+ validateConsumed();
+ }
+
+ @Test
+ public void doesNotConsumeAlreadyConsumedRead() {
+ Unbounded<Long> transform = Read.from(CountingSource.unbounded());
+ final PCollection<Long> output = pipeline.apply(transform);
+ final Flatten.PCollections<Long> consumer = Flatten.<Long>pCollections();
+ PCollectionList.of(output).apply(consumer);
+ UnconsumedReads.ensureAllReadsConsumed(pipeline);
+ pipeline.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ // The output should only be consumed by a single consumer
+ if (node.getInputs().values().contains(output)) {
+ assertThat(node.getTransform(), Matchers.<PTransform<?, ?>>is(consumer));
+ }
+ }
+ });
+ }
+
+ private void validateConsumed() {
+ final Set<PValue> consumedOutputs = new HashSet<PValue>();
+ final Set<PValue> allReadOutputs = new HashSet<PValue>();
+ pipeline.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ consumedOutputs.addAll(node.getInputs().values());
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (producer.getTransform() instanceof Read.Bounded
+ || producer.getTransform() instanceof Read.Unbounded) {
+ allReadOutputs.add(value);
+ }
+ }
+ });
+ assertThat(consumedOutputs, Matchers.hasItems(allReadOutputs.toArray(new PValue[0])));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 4eec6b8..2912fa7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -65,6 +65,7 @@ import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
+import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
@@ -690,6 +691,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
@VisibleForTesting
void replaceTransforms(Pipeline pipeline) {
boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
+ // Ensure all outputs of all reads are consumed before potentially replacing any
+ // Read PTransforms
+ UnconsumedReads.ensureAllReadsConsumed(pipeline);
pipeline.replaceAll(getOverrides(streaming));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 79a96e7..36704bc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -57,6 +58,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -65,11 +67,13 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextIO.Read;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
@@ -331,6 +335,26 @@ public class DataflowRunnerTest {
.apply(TextIO.Write.to(options.getOutput()).withoutValidation());
}
+ /**
+ * Tests that all reads are consumed by at least one {@link PTransform}.
+ */
+ @Test
+ public void testUnconsumedReads() throws IOException {
+ DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
+ RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
+ Pipeline p = buildDataflowPipeline(dataflowOptions);
+ PCollection<String> unconsumed = p.apply(Read.from(options.getInput()).withoutValidation());
+ DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
+ final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
+ p.traverseTopologically(new PipelineVisitor.Defaults() {
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ unconsumedSeenAsInput.set(true);
+ }
+ });
+ assertThat(unconsumedSeenAsInput.get(), is(true));
+ }
+
@Test
public void testRunReturnDifferentRequestId() throws IOException {
DataflowPipelineOptions options = buildPipelineOptions();