You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/19 17:53:40 UTC

[1/2] beam git commit: This closes #2587

Repository: beam
Updated Branches:
  refs/heads/master 714fdd292 -> 29e054a8d


This closes #2587


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29e054a8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29e054a8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29e054a8

Branch: refs/heads/master
Commit: 29e054a8d7ffe6a061dbbe9a1885185b02f4e8ec
Parents: 714fdd2 418c304
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 19 10:53:30 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 19 10:53:30 2017 -0700

----------------------------------------------------------------------
 .../core/construction/UnconsumedReads.java      |  72 +++++++++++++
 .../core/construction/UnconsumedReadsTest.java  | 105 +++++++++++++++++++
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +
 .../runners/dataflow/DataflowRunnerTest.java    |  24 +++++
 4 files changed, 205 insertions(+)
----------------------------------------------------------------------



[2/2] beam git commit: Ensure all Read outputs are consumed in Dataflow

Posted by tg...@apache.org.
Ensure all Read outputs are consumed in Dataflow

Apply a no-op ParDo to any PTransform that is not consumed.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/418c304d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/418c304d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/418c304d

Branch: refs/heads/master
Commit: 418c304dbff1ce8c176d08c890780ec97245aaae
Parents: 714fdd2
Author: Thomas Groh <tg...@google.com>
Authored: Tue Apr 18 17:25:59 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 19 10:53:30 2017 -0700

----------------------------------------------------------------------
 .../core/construction/UnconsumedReads.java      |  72 +++++++++++++
 .../core/construction/UnconsumedReadsTest.java  | 105 +++++++++++++++++++
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +
 .../runners/dataflow/DataflowRunnerTest.java    |  24 +++++
 4 files changed, 205 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
new file mode 100644
index 0000000..c191eeb
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnconsumedReads.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+
+/**
+ * Utilities for ensuring that all {@link Read} {@link PTransform PTransforms} are consumed by some
+ * {@link PTransform}.
+ */
+public class UnconsumedReads {
+  public static void ensureAllReadsConsumed(Pipeline pipeline) {
+    final Set<PCollection<?>> unconsumed = new HashSet<>();
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            unconsumed.removeAll(node.getInputs().values());
+          }
+
+          @Override
+          public void visitValue(PValue value, Node producer) {
+            if (producer.getTransform() instanceof Read.Bounded
+                || producer.getTransform() instanceof Read.Unbounded) {
+              unconsumed.add((PCollection<?>) value);
+            }
+          }
+        });
+    int i = 0;
+    for (PCollection<?> unconsumedPCollection : unconsumed) {
+      consume(unconsumedPCollection, i);
+      i++;
+    }
+  }
+
+  private static <T> void consume(PCollection<T> unconsumedPCollection, int uniq) {
+    // Multiple applications should never break due to stable unique names.
+    String uniqueName = "DropInputs" + (uniq == 0 ? "" : uniq);
+    unconsumedPCollection.apply(uniqueName, ParDo.of(new NoOpDoFn<T>()));
+  }
+
+  private static class NoOpDoFn<T> extends DoFn<T, T> {
+    @ProcessElement
+    public void doNothing(ProcessContext context) {}
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java
new file mode 100644
index 0000000..1966a93
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnconsumedReadsTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.io.Read.Bounded;
+import org.apache.beam.sdk.io.Read.Unbounded;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link UnconsumedReads}.
+ */
+@RunWith(JUnit4.class)
+public class UnconsumedReadsTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Test
+  public void matcherProducesUnconsumedValueBoundedRead() {
+    Bounded<Long> transform = Read.from(CountingSource.upTo(20L));
+    PCollection<Long> output = pipeline.apply(transform);
+    UnconsumedReads.ensureAllReadsConsumed(pipeline);
+    validateConsumed();
+  }
+
+  @Test
+  public void matcherProducesUnconsumedValueUnboundedRead() {
+    Unbounded<Long> transform = Read.from(CountingSource.unbounded());
+    PCollection<Long> output = pipeline.apply(transform);
+    UnconsumedReads.ensureAllReadsConsumed(pipeline);
+    validateConsumed();
+  }
+
+  @Test
+  public void doesNotConsumeAlreadyConsumedRead() {
+    Unbounded<Long> transform = Read.from(CountingSource.unbounded());
+    final PCollection<Long> output = pipeline.apply(transform);
+    final Flatten.PCollections<Long> consumer = Flatten.<Long>pCollections();
+    PCollectionList.of(output).apply(consumer);
+    UnconsumedReads.ensureAllReadsConsumed(pipeline);
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            // The output should only be consumed by a single consumer
+            if (node.getInputs().values().contains(output)) {
+              assertThat(node.getTransform(), Matchers.<PTransform<?, ?>>is(consumer));
+            }
+          }
+        });
+  }
+
+  private void validateConsumed() {
+    final Set<PValue> consumedOutputs = new HashSet<PValue>();
+    final Set<PValue> allReadOutputs = new HashSet<PValue>();
+    pipeline.traverseTopologically(
+        new PipelineVisitor.Defaults() {
+          @Override
+          public void visitPrimitiveTransform(Node node) {
+            consumedOutputs.addAll(node.getInputs().values());
+          }
+
+          @Override
+          public void visitValue(PValue value, Node producer) {
+            if (producer.getTransform() instanceof Read.Bounded
+                || producer.getTransform() instanceof Read.Unbounded) {
+              allReadOutputs.add(value);
+            }
+          }
+        });
+    assertThat(consumedOutputs, Matchers.hasItems(allReadOutputs.toArray(new PValue[0])));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 4eec6b8..2912fa7 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -65,6 +65,7 @@ import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
 import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
+import org.apache.beam.runners.core.construction.UnconsumedReads;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.BatchCombineGloballyAsSingletonViewFactory;
 import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
 import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
@@ -690,6 +691,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   @VisibleForTesting
   void replaceTransforms(Pipeline pipeline) {
     boolean streaming = options.isStreaming() || containsUnboundedPCollection(pipeline);
+    // Ensure all outputs of all reads are consumed before potentially replacing any
+    // Read PTransforms
+    UnconsumedReads.ensureAllReadsConsumed(pipeline);
     pipeline.replaceAll(getOverrides(streaming));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/418c304d/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 79a96e7..36704bc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -23,6 +23,7 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -57,6 +58,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Pattern;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -65,11 +67,13 @@ import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.TextIO.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.testing.ExpectedLogs;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -331,6 +335,26 @@ public class DataflowRunnerTest {
         .apply(TextIO.Write.to(options.getOutput()).withoutValidation());
   }
 
+  /**
+   * Tests that all reads are consumed by at least one {@link PTransform}.
+   */
+  @Test
+  public void testUnconsumedReads() throws IOException {
+    DataflowPipelineOptions dataflowOptions = buildPipelineOptions();
+    RuntimeTestOptions options = dataflowOptions.as(RuntimeTestOptions.class);
+    Pipeline p = buildDataflowPipeline(dataflowOptions);
+    PCollection<String> unconsumed = p.apply(Read.from(options.getInput()).withoutValidation());
+    DataflowRunner.fromOptions(dataflowOptions).replaceTransforms(p);
+    final AtomicBoolean unconsumedSeenAsInput = new AtomicBoolean();
+    p.traverseTopologically(new PipelineVisitor.Defaults() {
+      @Override
+      public void visitPrimitiveTransform(Node node) {
+        unconsumedSeenAsInput.set(true);
+      }
+    });
+    assertThat(unconsumedSeenAsInput.get(), is(true));
+  }
+
   @Test
   public void testRunReturnDifferentRequestId() throws IOException {
     DataflowPipelineOptions options = buildPipelineOptions();