You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/25 00:47:05 UTC

[2/4] beam git commit: Explicitly duplicate input PCollections in Dataflow

Explicitly duplicate input PCollections in Dataflow


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54a7de13
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54a7de13
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54a7de13

Branch: refs/heads/master
Commit: 54a7de13dc36cf3894cb1d34f4e3869a550e801a
Parents: 0f91068
Author: Thomas Groh <tg...@google.com>
Authored: Fri Mar 24 15:14:27 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 24 17:46:53 2017 -0700

----------------------------------------------------------------------
 .../DeduplicatedFlattenFactory.java             | 109 ++++++++++++++++++
 .../core/construction/PTransformMatchers.java   |  26 +++++
 .../DeduplicatedFlattenFactoryTest.java         | 112 +++++++++++++++++++
 .../construction/PTransformMatchersTest.java    |  67 +++++++++++
 .../beam/runners/dataflow/DataflowRunner.java   |   5 +-
 5 files changed, 318 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/54a7de13/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
new file mode 100644
index 0000000..093385e
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.Flatten.PCollections;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+/**
+ * A {@link PTransformOverrideFactory} that will apply a flatten where no element appears in the
+ * input {@link PCollectionList} more than once.
+ */
+public class DeduplicatedFlattenFactory<T>
+    implements PTransformOverrideFactory<
+        PCollectionList<T>, PCollection<T>, Flatten.PCollections<T>> {
+
+  public static <T> DeduplicatedFlattenFactory<T> create() {
+    return new DeduplicatedFlattenFactory<>();
+  }
+
+  private DeduplicatedFlattenFactory() {}
+  @Override
+  public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform(
+      PCollections<T> transform) {
+    return new PTransform<PCollectionList<T>, PCollection<T>>() {
+      @Override
+      public PCollection<T> expand(PCollectionList<T> input) {
+        Map<PCollection<T>, Integer> instances = new HashMap<>();
+        for (PCollection<T> pCollection : input.getAll()) {
+          int existing = instances.get(pCollection) == null ? 0 : instances.get(pCollection);
+          instances.put(pCollection, existing + 1);
+        }
+        PCollectionList<T> output = PCollectionList.empty(input.getPipeline());
+        for (Map.Entry<PCollection<T>, Integer> instanceEntry : instances.entrySet()) {
+          if (instanceEntry.getValue().equals(1)) {
+            output = output.and(instanceEntry.getKey());
+          } else {
+            String duplicationName = String.format("Multiply %s", instanceEntry.getKey().getName());
+            PCollection<T> duplicated =
+                instanceEntry
+                    .getKey()
+                    .apply(duplicationName, ParDo.of(new DuplicateFn<T>(instanceEntry.getValue())));
+            output = output.and(duplicated);
+          }
+        }
+        return output.apply(Flatten.<T>pCollections());
+      }
+    };
+  }
+
+  @Override
+  public PCollectionList<T> getInput(
+      List<TaggedPValue> inputs, Pipeline p) {
+    PCollectionList<T> pCollections = PCollectionList.empty(p);
+    for (TaggedPValue input : inputs) {
+      PCollection<T> pcollection = (PCollection<T>) input.getValue();
+      pCollections = pCollections.and(pcollection);
+    }
+    return pCollections;
+  }
+
+  @Override
+  public Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, PCollection<T> newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
+  }
+
+  private static class DuplicateFn<T> extends DoFn<T, T> {
+    private final int numTimes;
+
+    private DuplicateFn(int numTimes) {
+      this.numTimes = numTimes;
+    }
+
+    @ProcessElement
+    public void emitCopies(ProcessContext context) {
+      for (int i = 0; i < numTimes; i++) {
+        context.output(context.element());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/54a7de13/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index efcc455..38cf76f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.core.construction;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.io.Write;
@@ -30,6 +32,8 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
 
 /**
  * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the
@@ -178,6 +182,28 @@ public class PTransformMatchers {
     };
   }
 
+  /**
+   * A {@link PTransformMatcher} which matches a {@link Flatten.PCollections} which
+   * consumes a single input {@link PCollection} multiple times.
+   */
+  public static PTransformMatcher flattenWithDuplicateInputs() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        if (application.getTransform() instanceof Flatten.PCollections) {
+          Set<PValue> observed = new HashSet<>();
+          for (TaggedPValue pvalue : application.getInputs()) {
+            boolean firstInstance = observed.add(pvalue.getValue());
+            if (!firstInstance) {
+              return true;
+            }
+          }
+        }
+        return false;
+      }
+    };
+  }
+
   public static PTransformMatcher writeWithRunnerDeterminedSharding() {
     return new PTransformMatcher() {
       @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/54a7de13/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
new file mode 100644
index 0000000..a251f5a
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.testing.NeedsRunner;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link DeduplicatedFlattenFactory}.
+ */
+@RunWith(JUnit4.class)
+public class DeduplicatedFlattenFactoryTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+
+  private PCollection<String> first = pipeline.apply("FirstCreate", Create.of("one"));
+  private PCollection<String> second = pipeline.apply("SecondCreate", Create.of("two"));
+  private DeduplicatedFlattenFactory<String> factory = DeduplicatedFlattenFactory.create();
+
+  @Test
+  public void duplicatesInsertsMultipliers() {
+    PTransform<PCollectionList<String>, PCollection<String>> replacement =
+        factory.getReplacementTransform(Flatten.<String>pCollections());
+    final PCollectionList<String> inputList =
+        PCollectionList.of(first).and(second).and(first).and(first);
+    inputList.apply(replacement);
+    pipeline.traverseTopologically(
+        new Defaults() {
+          @Override
+          public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+            if (node.getTransform() instanceof Flatten.PCollections) {
+              assertThat(node.getInputs(), not(equalTo(inputList.expand())));
+            }
+          }
+        });
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testOverride() {
+    PTransform<PCollectionList<String>, PCollection<String>> replacement =
+        factory.getReplacementTransform(Flatten.<String>pCollections());
+    final PCollectionList<String> inputList =
+        PCollectionList.of(first).and(second).and(first).and(first);
+    PCollection<String> flattened = inputList.apply(replacement);
+
+    PAssert.that(flattened).containsInAnyOrder("one", "two", "one", "one");
+    pipeline.run();
+  }
+
+  @Test
+  public void inputReconstruction() {
+    final PCollectionList<String> inputList =
+        PCollectionList.of(first).and(second).and(first).and(first);
+
+    assertThat(factory.getInput(inputList.expand(), pipeline), equalTo(inputList));
+  }
+
+  @Test
+  public void outputMapping() {
+    final PCollectionList<String> inputList =
+        PCollectionList.of(first).and(second).and(first).and(first);
+    PCollection<String> original =
+        inputList.apply(Flatten.<String>pCollections());
+    PCollection<String> replacement =
+        inputList.apply(factory.getReplacementTransform(Flatten.<String>pCollections()));
+
+    assertThat(
+        factory.mapOutputs(original.expand(), replacement),
+        Matchers.<PValue, ReplacementOutput>hasEntry(
+            replacement,
+            ReplacementOutput.of(
+                Iterables.getOnlyElement(original.expand()),
+                Iterables.getOnlyElement(replacement.expand()))));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/54a7de13/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 491c14f..0fead17 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -24,6 +24,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -386,6 +387,72 @@ public class PTransformMatchersTest implements Serializable {
   }
 
   @Test
+  public void flattenWithDuplicateInputsWithoutDuplicates() {
+    AppliedPTransform application =
+        AppliedPTransform
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>
+                of(
+                "Flatten",
+                Collections.singletonList(
+                    TaggedPValue.of(
+                        new TupleTag<Object>(),
+                        PCollection.createPrimitiveOutputInternal(
+                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                Collections.singletonList(
+                    TaggedPValue.of(
+                        new TupleTag<Object>(),
+                        PCollection.createPrimitiveOutputInternal(
+                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                Flatten.pCollections(),
+                p);
+
+    assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
+  }
+
+  @Test
+  public void flattenWithDuplicateInputsWithDuplicates() {
+    PCollection<Object> duplicate = PCollection.createPrimitiveOutputInternal(p,
+        WindowingStrategy.globalDefault(),
+        IsBounded.BOUNDED);
+    AppliedPTransform application =
+        AppliedPTransform
+            .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of(
+                "Flatten",
+                ImmutableList.of(
+                    TaggedPValue.of(new TupleTag<Object>(), duplicate),
+                    TaggedPValue.of(new TupleTag<Object>(), duplicate)),
+                Collections.singletonList(
+                    TaggedPValue.of(
+                        new TupleTag<Object>(),
+                        PCollection.createPrimitiveOutputInternal(
+                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                Flatten.pCollections(),
+                p);
+
+    assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(true));
+  }
+
+  @Test
+  public void flattenWithDuplicateInputsNonFlatten() {
+    AppliedPTransform application =
+        AppliedPTransform
+            .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>
+                of(
+                "EmptyFlatten",
+                Collections.<TaggedPValue>emptyList(),
+                Collections.singletonList(
+                    TaggedPValue.of(
+                        new TupleTag<Object>(),
+                        PCollection.createPrimitiveOutputInternal(
+                            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))),
+                Flatten.iterables() /* This isn't actually possible to construct,
+                                 * but for the sake of example */,
+                p);
+
+    assertThat(PTransformMatchers.flattenWithDuplicateInputs().matches(application), is(false));
+  }
+
+  @Test
   public void writeWithRunnerDeterminedSharding() {
     Write<Integer> write =
         Write.to(

http://git-wip-us.apache.org/repos/asf/beam/blob/54a7de13/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index e2ab5c2..24e23da 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -58,6 +58,7 @@ import java.util.Random;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import org.apache.beam.runners.core.construction.DeduplicatedFlattenFactory;
 import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
@@ -294,7 +295,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
         ImmutableMap.builder();
     // Create is implemented in terms of a Read, so it must precede the override to Read in
     // streaming
-    ptoverrides.put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
+    ptoverrides
+        .put(PTransformMatchers.flattenWithDuplicateInputs(), DeduplicatedFlattenFactory.create())
+        .put(PTransformMatchers.emptyFlatten(), EmptyFlattenAsCreateFactory.instance());
     if (streaming) {
       // In streaming mode must use either the custom Pubsub unbounded source/sink or
       // defer to Windmill's built-in implementation.