You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/05/26 16:23:52 UTC

[1/4] beam git commit: Roll-forward Include Additional PTransform inputs in Transform Nodes

Repository: beam
Updated Branches:
  refs/heads/master 7568f0298 -> 6bb204f34


Roll-forward Include Additional PTransform inputs in Transform Nodes

Add the value of PTransform.getAdditionalInputs in the inputs of a
TransformHierarchy node.

Fork the Node constructor to reduce nullability

This slightly simplifies the constructor implementation(s).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2e2ae9cf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2e2ae9cf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2e2ae9cf

Branch: refs/heads/master
Commit: 2e2ae9cfa581a73864695d15102acadc2750a57a
Parents: 8eb09aa
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 23 09:43:45 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700

----------------------------------------------------------------------
 .../apex/translation/TranslationContext.java    |   4 +-
 .../core/construction/TransformInputs.java      |  50 ++++++
 .../core/construction/TransformInputsTest.java  | 166 +++++++++++++++++++
 .../beam/runners/direct/DirectGraphVisitor.java |  15 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   9 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +
 .../direct/StatefulParDoEvaluatorFactory.java   |   1 +
 .../beam/runners/direct/WatermarkManager.java   |  17 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   6 +-
 .../flink/FlinkBatchTranslationContext.java     |   3 +-
 .../flink/FlinkStreamingTranslationContext.java |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   5 +-
 .../spark/translation/EvaluationContext.java    |   4 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  28 +++-
 14 files changed, 284 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index aff3863..94d13e1 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.apex.translation.utils.ApexStateInternals;
 import org.apache.beam.runners.apex.translation.utils.ApexStateInternals.ApexStateBackend;
 import org.apache.beam.runners.apex.translation.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translation.utils.CoderAdapterStreamCodec;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -93,7 +94,8 @@ class TranslationContext {
   }
 
   public <InputT extends PValue> InputT getInput() {
-    return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values());
+    return (InputT)
+        Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
   }
 
   public Map<TupleTag<?>, PValue> getOutputs() {

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
new file mode 100644
index 0000000..2baf93a
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformInputs.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Utilities for extracting subsets of inputs from an {@link AppliedPTransform}. */
+public class TransformInputs {
+  /**
+   * Gets all inputs of the {@link AppliedPTransform} that are not returned by {@link
+   * PTransform#getAdditionalInputs()}.
+   */
+  public static Collection<PValue> nonAdditionalInputs(AppliedPTransform<?, ?, ?> application) {
+    ImmutableList.Builder<PValue> mainInputs = ImmutableList.builder();
+    PTransform<?, ?> transform = application.getTransform();
+    for (Map.Entry<TupleTag<?>, PValue> input : application.getInputs().entrySet()) {
+      if (!transform.getAdditionalInputs().containsKey(input.getKey())) {
+        mainInputs.add(input.getValue());
+      }
+    }
+    checkArgument(
+        !mainInputs.build().isEmpty() || application.getInputs().isEmpty(),
+        "Expected at least one main input if any inputs exist");
+    return mainInputs.build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
new file mode 100644
index 0000000..f5b2c11
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformInputsTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link TransformInputs}. */
+@RunWith(JUnit4.class)
+public class TransformInputsTest {
+  @Rule public TestPipeline pipeline = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void nonAdditionalInputsWithNoInputSucceeds() {
+    AppliedPTransform<PInput, POutput, TestTransform> transform =
+        AppliedPTransform.of(
+            "input-free",
+            Collections.<TupleTag<?>, PValue>emptyMap(),
+            Collections.<TupleTag<?>, PValue>emptyMap(),
+            new TestTransform(),
+            pipeline);
+
+    assertThat(TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>empty());
+  }
+
+  @Test
+  public void nonAdditionalInputsWithOneMainInputSucceeds() {
+    PCollection<Long> input = pipeline.apply(GenerateSequence.from(1L));
+    AppliedPTransform<PInput, POutput, TestTransform> transform =
+        AppliedPTransform.of(
+            "input-single",
+            Collections.<TupleTag<?>, PValue>singletonMap(new TupleTag<Long>() {}, input),
+            Collections.<TupleTag<?>, PValue>emptyMap(),
+            new TestTransform(),
+            pipeline);
+
+    assertThat(
+        TransformInputs.nonAdditionalInputs(transform), Matchers.<PValue>containsInAnyOrder(input));
+  }
+
+  @Test
+  public void nonAdditionalInputsWithMultipleNonAdditionalInputsSucceeds() {
+    Map<TupleTag<?>, PValue> allInputs = new HashMap<>();
+    PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3));
+    allInputs.put(new TupleTag<Integer>() {}, mainInts);
+    PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of()));
+    allInputs.put(new TupleTag<Void>() {}, voids);
+    AppliedPTransform<PInput, POutput, TestTransform> transform =
+        AppliedPTransform.of(
+            "additional-free",
+            allInputs,
+            Collections.<TupleTag<?>, PValue>emptyMap(),
+            new TestTransform(),
+            pipeline);
+
+    assertThat(
+        TransformInputs.nonAdditionalInputs(transform),
+        Matchers.<PValue>containsInAnyOrder(voids, mainInts));
+  }
+
+  @Test
+  public void nonAdditionalInputsWithAdditionalInputsSucceeds() {
+    Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
+    additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2", "3")));
+    additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L)));
+
+    Map<TupleTag<?>, PValue> allInputs = new HashMap<>();
+    PCollection<Integer> mainInts = pipeline.apply("MainInput", Create.of(12, 3));
+    allInputs.put(new TupleTag<Integer>() {}, mainInts);
+    PCollection<Void> voids = pipeline.apply("VoidInput", Create.empty(VoidCoder.of()));
+    allInputs.put(
+        new TupleTag<Void>() {}, voids);
+    allInputs.putAll(additionalInputs);
+
+    AppliedPTransform<PInput, POutput, TestTransform> transform =
+        AppliedPTransform.of(
+            "additional",
+            allInputs,
+            Collections.<TupleTag<?>, PValue>emptyMap(),
+            new TestTransform(additionalInputs),
+            pipeline);
+
+    assertThat(
+        TransformInputs.nonAdditionalInputs(transform),
+        Matchers.<PValue>containsInAnyOrder(mainInts, voids));
+  }
+
+  @Test
+  public void nonAdditionalInputsWithOnlyAdditionalInputsThrows() {
+    Map<TupleTag<?>, PValue> additionalInputs = new HashMap<>();
+    additionalInputs.put(new TupleTag<String>() {}, pipeline.apply(Create.of("1, 2", "3")));
+    additionalInputs.put(new TupleTag<Long>() {}, pipeline.apply(GenerateSequence.from(3L)));
+
+    AppliedPTransform<PInput, POutput, TestTransform> transform =
+        AppliedPTransform.of(
+            "additional-only",
+            additionalInputs,
+            Collections.<TupleTag<?>, PValue>emptyMap(),
+            new TestTransform(additionalInputs),
+            pipeline);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("at least one");
+    TransformInputs.nonAdditionalInputs(transform);
+  }
+
+  private static class TestTransform extends PTransform<PInput, POutput> {
+    private final Map<TupleTag<?>, PValue> additionalInputs;
+
+    private TestTransform() {
+      this(Collections.<TupleTag<?>, PValue>emptyMap());
+    }
+
+    private TestTransform(Map<TupleTag<?>, PValue> additionalInputs) {
+      this.additionalInputs = additionalInputs;
+    }
+
+    @Override
+    public POutput expand(PInput input) {
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+      return additionalInputs;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 01204e3..ed4282b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -21,10 +21,12 @@ import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.ListMultimap;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
@@ -34,6 +36,8 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
@@ -41,6 +45,7 @@ import org.apache.beam.sdk.values.PValue;
  * input after the upstream transform has produced and committed output.
  */
 class DirectGraphVisitor extends PipelineVisitor.Defaults {
+  private static final Logger LOG = LoggerFactory.getLogger(DirectGraphVisitor.class);
 
   private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
 
@@ -83,7 +88,15 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults {
     if (node.getInputs().isEmpty()) {
       rootTransforms.add(appliedTransform);
     } else {
-      for (PValue value : node.getInputs().values()) {
+      Collection<PValue> mainInputs =
+          TransformInputs.nonAdditionalInputs(node.toAppliedPTransform(getPipeline()));
+      if (!mainInputs.containsAll(node.getInputs().values())) {
+        LOG.debug(
+            "Inputs reduced to {} from {} by removing additional inputs",
+            mainInputs,
+            node.getInputs().values());
+      }
+      for (PValue value : mainInputs) {
         primitiveConsumers.put(value, appliedTransform);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 8aa75cf..516f798 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.direct;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -79,6 +78,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         (TransformEvaluator<T>)
             createEvaluator(
                 (AppliedPTransform) application,
+                (PCollection<InputT>) inputBundle.getPCollection(),
                 inputBundle.getKey(),
                 doFn,
                 transform.getSideInputs(),
@@ -102,6 +102,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   @SuppressWarnings({"unchecked", "rawtypes"})
   DoFnLifecycleManagerRemovingTransformEvaluator<InputT> createEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
+      PCollection<InputT> mainInput,
       StructuralKey<?> inputBundleKey,
       DoFn<InputT, OutputT> doFn,
       List<PCollectionView<?>> sideInputs,
@@ -120,6 +121,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
         createParDoEvaluator(
             application,
             inputBundleKey,
+            mainInput,
             sideInputs,
             mainOutputTag,
             additionalOutputTags,
@@ -132,6 +134,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
   ParDoEvaluator<InputT> createParDoEvaluator(
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       StructuralKey<?> key,
+      PCollection<InputT> mainInput,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags,
@@ -144,8 +147,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
           evaluationContext,
           stepContext,
           application,
-          ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values()))
-              .getWindowingStrategy(),
+          mainInput.getWindowingStrategy(),
           fn,
           key,
           sideInputs,
@@ -173,5 +175,4 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
     }
     return pcs;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index b85f481c..eccc83a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -116,6 +116,8 @@ class SplittableProcessElementsEvaluatorFactory<
             delegateFactory.createParDoEvaluator(
                 application,
                 inputBundle.getKey(),
+                (PCollection<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>)
+                    inputBundle.getPCollection(),
                 transform.getSideInputs(),
                 transform.getMainOutputTag(),
                 transform.getAdditionalOutputTags().getAll(),

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 506c84c..3619d05 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -117,6 +117,7 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo
     DoFnLifecycleManagerRemovingTransformEvaluator<KV<K, InputT>> delegateEvaluator =
         delegateFactory.createEvaluator(
             (AppliedPTransform) application,
+            (PCollection) inputBundle.getPCollection(),
             inputBundle.getKey(),
             doFn,
             application.getTransform().getUnderlyingParDo().getSideInputs(),

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
index 4f1b831..b15b52e 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java
@@ -823,10 +823,11 @@ class WatermarkManager {
       inputWmsBuilder.add(THE_END_OF_TIME);
     }
     for (PValue pvalue : inputs.values()) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue))
-              .synchronizedProcessingOutputWatermark;
-      inputWmsBuilder.add(producerOutputWatermark);
+      if (graph.getPrimitiveConsumers(pvalue).contains(transform)) {
+        Watermark producerOutputWatermark =
+            getTransformWatermark(graph.getProducer(pvalue)).synchronizedProcessingOutputWatermark;
+        inputWmsBuilder.add(producerOutputWatermark);
+      }
     }
     return inputWmsBuilder.build();
   }
@@ -838,9 +839,11 @@ class WatermarkManager {
       inputWatermarksBuilder.add(THE_END_OF_TIME);
     }
     for (PValue pvalue : inputs.values()) {
-      Watermark producerOutputWatermark =
-          getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
-      inputWatermarksBuilder.add(producerOutputWatermark);
+      if (graph.getPrimitiveConsumers(pvalue).contains(transform)) {
+        Watermark producerOutputWatermark =
+            getTransformWatermark(graph.getProducer(pvalue)).outputWatermark;
+        inputWatermarksBuilder.add(producerOutputWatermark);
+      }
     }
     List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
     return inputCollectionWatermarks;

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
index 09a21ac..8b86bbe 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java
@@ -98,7 +98,7 @@ public class ParDoEvaluatorTest {
     when(evaluationContext.createBundle(output)).thenReturn(outputBundle);
 
     ParDoEvaluator<Integer> evaluator =
-        createEvaluator(singletonView, fn, output);
+        createEvaluator(singletonView, fn, inputPc, output);
 
     IntervalWindow nonGlobalWindow = new IntervalWindow(new Instant(0), new Instant(10_000L));
     WindowedValue<Integer> first = WindowedValue.valueInGlobalWindow(3);
@@ -132,6 +132,7 @@ public class ParDoEvaluatorTest {
   private ParDoEvaluator<Integer> createEvaluator(
       PCollectionView<Integer> singletonView,
       RecorderFn fn,
+      PCollection<Integer> input,
       PCollection<Integer> output) {
     when(
             evaluationContext.createSideInputReader(
@@ -156,8 +157,7 @@ public class ParDoEvaluatorTest {
         evaluationContext,
         stepContext,
         transform,
-        ((PCollection<?>) Iterables.getOnlyElement(transform.getInputs().values()))
-            .getWindowingStrategy(),
+        input.getWindowingStrategy(),
         fn,
         null /* key */,
         ImmutableList.<PCollectionView<?>>of(singletonView),

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
index 0439119..6e70198 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.flink;
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -143,7 +144,7 @@ class FlinkBatchTranslationContext {
 
   @SuppressWarnings("unchecked")
   <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
+    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
   }
 
   Map<TupleTag<?>, PValue> getOutputs(PTransform<?, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
index ea5f6b3..74a5fb9 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import com.google.common.collect.Iterables;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -113,7 +114,7 @@ class FlinkStreamingTranslationContext {
 
   @SuppressWarnings("unchecked")
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
-    return (T) Iterables.getOnlyElement(currentTransform.getInputs().values());
+    return (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(currentTransform));
   }
 
   public <T extends PInput> Map<TupleTag<?>, PValue> getInputs(PTransform<T, ?> transform) {

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index af93ef5..fccd018 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,6 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
@@ -395,7 +396,9 @@ public class DataflowPipelineTranslator {
 
     @Override
     public <InputT extends PValue> InputT getInput(PTransform<InputT, ?> transform) {
-      return (InputT) Iterables.getOnlyElement(getInputs(transform).values());
+      return (InputT)
+          Iterables.getOnlyElement(
+              TransformInputs.nonAdditionalInputs(getCurrentTransform(transform)));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index 8102926..0c6c4d1 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -26,6 +26,7 @@ import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.sdk.Pipeline;
@@ -103,7 +104,8 @@ public class EvaluationContext {
 
   public <T extends PValue> T getInput(PTransform<T, ?> transform) {
     @SuppressWarnings("unchecked")
-    T input = (T) Iterables.getOnlyElement(getInputs(transform).values());
+    T input =
+        (T) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(getCurrentTransform()));
     return input;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/2e2ae9cf/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 630d24c..bebc306 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
@@ -68,7 +67,7 @@ public class TransformHierarchy {
     producers = new HashMap<>();
     producerInput = new HashMap<>();
     unexpandedInputs = new HashMap<>();
-    root = new Node(null, null, "", null);
+    root = new Node();
     current = root;
   }
 
@@ -255,25 +254,36 @@ public class TransformHierarchy {
     boolean finishedSpecifying = false;
 
     /**
+     * Creates the root-level node. The root level node has a null enclosing node, a null transform,
+     * an empty map of inputs, and a name equal to the empty string.
+     */
+    private Node() {
+      this.enclosingNode = null;
+      this.transform = null;
+      this.fullName = "";
+      this.inputs = Collections.emptyMap();
+    }
+
+    /**
      * Creates a new Node with the given parent and transform.
      *
-     * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
-     * nodes.
-     *
      * @param enclosingNode the composite node containing this node
      * @param transform the PTransform tracked by this node
      * @param fullName the fully qualified name of the transform
      * @param input the unexpanded input to the transform
      */
     private Node(
-        @Nullable Node enclosingNode,
-        @Nullable PTransform<?, ?> transform,
+        Node enclosingNode,
+        PTransform<?, ?> transform,
         String fullName,
-        @Nullable PInput input) {
+        PInput input) {
       this.enclosingNode = enclosingNode;
       this.transform = transform;
       this.fullName = fullName;
-      this.inputs = input == null ? Collections.<TupleTag<?>, PValue>emptyMap() : input.expand();
+      ImmutableMap.Builder<TupleTag<?>, PValue> inputs = ImmutableMap.builder();
+      inputs.putAll(input.expand());
+      inputs.putAll(transform.getAdditionalInputs());
+      this.inputs = inputs.build();
     }
 
     /**


[3/4] beam git commit: Visit a Transform Hierarchy in Topological Order

Posted by tg...@apache.org.
Visit a Transform Hierarchy in Topological Order

Always ensure that the producer of a value is visited before that value
is visited for the first time. Visit a compoosite before visiting any of
its child nodes.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd1dfdf3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd1dfdf3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd1dfdf3

Branch: refs/heads/master
Commit: bd1dfdf3c8e145a99bcacebd0c64dcf6580f3ffe
Parents: 7568f02
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 23 13:29:51 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/spark/SparkRunner.java  | 13 +++
 .../spark/translation/BoundedDataset.java       |  6 ++
 .../spark/translation/TransformTranslator.java  |  1 +
 .../spark/translation/StorageLevelTest.java     |  4 +-
 .../beam/sdk/runners/TransformHierarchy.java    | 46 ++++++++++-
 .../sdk/runners/TransformHierarchyTest.java     | 86 ++++++++++++++++++++
 6 files changed, 150 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 9e2426e..1be5e13 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -21,8 +21,10 @@ package org.apache.beam.runners.spark;
 import com.google.common.collect.Iterables;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -49,6 +51,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PCollection;
@@ -336,6 +339,8 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
     protected final EvaluationContext ctxt;
     protected final SparkPipelineTranslator translator;
 
+    private final Set<Node> shouldIgnoreChildren = new HashSet<>();
+
     public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) {
       this.translator = translator;
       this.ctxt = ctxt;
@@ -351,6 +356,7 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
           LOG.info("Entering directly-translatable composite transform: '{}'", node.getFullName());
           LOG.debug("Composite transform class: '{}'", transformClass);
           doVisitTransform(node);
+          shouldIgnoreChildren.add(node);
           return CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
         }
       }
@@ -392,6 +398,13 @@ public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
     @Override
     public void visitPrimitiveTransform(TransformHierarchy.Node node) {
+      Node parent = node.getEnclosingNode();
+      while (!parent.isRootNode()) {
+        if (shouldIgnoreChildren.contains(parent)) {
+          return;
+        }
+        parent = parent.getEnclosingNode();
+      }
       doVisitTransform(node);
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
index 652c753..a746634 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java
@@ -19,6 +19,7 @@
 package org.apache.beam.runners.spark.translation;
 
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -97,8 +98,13 @@ public class BoundedDataset<T> implements Dataset {
     return windowedValues;
   }
 
+  int timesCached = 0;
   @Override
   public void cache(String storageLevel) {
+    System.out.printf(
+        "Persisting Dataset %s for RDD %s (id %s) at level %s. %s times before%n",
+        this, getRDD(), getRDD().toDebugString(), storageLevel, timesCached++);
+    System.out.println(Joiner.on("\n\t").join(new Throwable().getStackTrace()));
     // populate the rdd if needed
     getRDD().persist(StorageLevel.fromString(storageLevel));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 742ea83..6ca12c9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -407,6 +407,7 @@ public final class TransformTranslator {
         JavaRDD<WindowedValue<T>> input = new SourceRDD.Bounded<>(
             jsc.sc(), transform.getSource(), runtimeContext, stepName).toJavaRDD();
         // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation.
+        System.out.println("Evaluating Bounded Read " + transform);
         context.putDataset(transform, new BoundedDataset<>(input.cache()));
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
index 8f2e681..8bd6dae 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/StorageLevelTest.java
@@ -58,12 +58,12 @@ public class StorageLevelTest {
 
   @Test
   public void test() throws Exception {
-    PCollection<String> pCollection = pipeline.apply(Create.of("foo"));
+    PCollection<String> pCollection = pipeline.apply("CreateFoo", Create.of("foo"));
 
     // by default, the Spark runner doesn't cache the RDD if it accessed only one time.
     // So, to "force" the caching of the RDD, we have to call the RDD at least two time.
     // That's why we are using Count fn on the PCollection.
-    pCollection.apply(Count.<String>globally());
+    pCollection.apply("CountAll", Count.<String>globally());
 
     PCollection<String> output = pCollection.apply(new StorageLevelPTransform());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 2f0e8ef..630d24c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -202,10 +202,12 @@ public class TransformHierarchy {
     return producers.get(produced);
   }
 
+  int traversed = 0;
   public Set<PValue> visit(PipelineVisitor visitor) {
     finishSpecifying();
     Set<PValue> visitedValues = new HashSet<>();
-    root.visit(visitor, visitedValues);
+    traversed++;
+    root.visit(visitor, visitedValues, new HashSet<Node>(), new HashSet<Node>());
     return visitedValues;
   }
 
@@ -462,7 +464,22 @@ public class TransformHierarchy {
      * <p>Provides an ordered visit of the input values, the primitive transform (or child nodes for
      * composite transforms), then the output values.
      */
-    private void visit(PipelineVisitor visitor, Set<PValue> visitedValues) {
+    private void visit(
+        PipelineVisitor visitor,
+        Set<PValue> visitedValues,
+        Set<Node> visitedNodes,
+        Set<Node> passedComposites) {
+      if (getEnclosingNode() != null && !visitedNodes.contains(getEnclosingNode())) {
+        getEnclosingNode().visit(visitor, visitedValues, visitedNodes, passedComposites);
+      }
+      if (!visitedNodes.add(this)) {
+        LOG.debug("Not revisiting previously visited node {}", this);
+        return;
+      } else if (childNodeOf(passedComposites)) {
+        LOG.debug("Not revisiting Node {} which is a child of a previously passed composite", this);
+        return;
+      }
+
       if (!finishedSpecifying) {
         finishSpecifying();
       }
@@ -470,22 +487,31 @@ public class TransformHierarchy {
       if (!isRootNode()) {
         // Visit inputs.
         for (PValue inputValue : inputs.values()) {
+          Node valueProducer = getProducer(inputValue);
+          if (!visitedNodes.contains(valueProducer)) {
+            valueProducer.visit(visitor, visitedValues, visitedNodes, passedComposites);
+          }
           if (visitedValues.add(inputValue)) {
-            visitor.visitValue(inputValue, getProducer(inputValue));
+            LOG.debug("Visiting input value {}", inputValue);
+            visitor.visitValue(inputValue, valueProducer);
           }
         }
       }
 
       if (isCompositeNode()) {
+        LOG.debug("Visiting composite node {}", this);
         PipelineVisitor.CompositeBehavior recurse = visitor.enterCompositeTransform(this);
 
         if (recurse.equals(CompositeBehavior.ENTER_TRANSFORM)) {
           for (Node child : parts) {
-            child.visit(visitor, visitedValues);
+            child.visit(visitor, visitedValues, visitedNodes, passedComposites);
           }
+        } else {
+          passedComposites.add(this);
         }
         visitor.leaveCompositeTransform(this);
       } else {
+        LOG.debug("Visiting primitive node {}", this);
         visitor.visitPrimitiveTransform(this);
       }
 
@@ -494,12 +520,24 @@ public class TransformHierarchy {
         // Visit outputs.
         for (PValue pValue : outputs.values()) {
           if (visitedValues.add(pValue)) {
+            LOG.debug("Visiting output value {}", pValue);
             visitor.visitValue(pValue, this);
           }
         }
       }
     }
 
+    private boolean childNodeOf(Set<Node> nodes) {
+      if (isRootNode()) {
+        return false;
+      }
+      Node parent = this.getEnclosingNode();
+      while (!parent.isRootNode() && !nodes.contains(parent)) {
+        parent = parent.getEnclosingNode();
+      }
+      return nodes.contains(parent);
+    }
+
     /**
      * Finish specifying a transform.
      *

http://git-wip-us.apache.org/repos/asf/beam/blob/bd1dfdf3/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1197d1b..2fe2817 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.runners;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
@@ -32,6 +33,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
@@ -492,4 +495,87 @@ public class TransformHierarchyTest implements Serializable {
     assertThat(visitedPrimitiveNodes, containsInAnyOrder(upstreamNode, replacementParNode));
     assertThat(visitedValues, Matchers.<PValue>containsInAnyOrder(upstream, output));
   }
+
+  @Test
+  public void visitIsTopologicallyOrdered() {
+    PCollection<String> one =
+        PCollection.<String>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+            .setCoder(StringUtf8Coder.of());
+    final PCollection<Integer> two =
+        PCollection.<Integer>createPrimitiveOutputInternal(
+                pipeline, WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED)
+            .setCoder(VarIntCoder.of());
+    final PDone done = PDone.in(pipeline);
+    final TupleTag<String> oneTag = new TupleTag<String>() {};
+    final TupleTag<Integer> twoTag = new TupleTag<Integer>() {};
+    final PCollectionTuple oneAndTwo = PCollectionTuple.of(oneTag, one).and(twoTag, two);
+
+    hierarchy.pushNode("consumes_both", one, new PTransform<PCollection<String>, PDone>() {
+      @Override
+      public PDone expand(PCollection<String> input) {
+        return done;
+      }
+
+      @Override
+      public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+        return Collections.<TupleTag<?>, PValue>singletonMap(twoTag, two);
+      }
+    });
+    hierarchy.setOutput(done);
+    hierarchy.popNode();
+
+    final PTransform<PBegin, PCollectionTuple> producer =
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return oneAndTwo;
+          }
+        };
+    hierarchy.pushNode(
+        "encloses_producer",
+        PBegin.in(pipeline),
+        new PTransform<PBegin, PCollectionTuple>() {
+          @Override
+          public PCollectionTuple expand(PBegin input) {
+            return input.apply(producer);
+          }
+        });
+    hierarchy.pushNode(
+        "creates_one_and_two",
+        PBegin.in(pipeline), producer);
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+    hierarchy.setOutput(oneAndTwo);
+    hierarchy.popNode();
+
+    hierarchy.visit(new PipelineVisitor.Defaults() {
+      private final Set<Node> visitedNodes = new HashSet<>();
+      private final Set<PValue> visitedValues = new HashSet<>();
+      @Override
+      public CompositeBehavior enterCompositeTransform(Node node) {
+        for (PValue input : node.getInputs().values()) {
+          assertThat(visitedValues, hasItem(input));
+        }
+        visitedNodes.add(node);
+        return CompositeBehavior.ENTER_TRANSFORM;
+      }
+
+      @Override
+      public void visitPrimitiveTransform(Node node) {
+        assertThat(visitedNodes, hasItem(node.getEnclosingNode()));
+        for (PValue input : node.getInputs().values()) {
+          assertThat(visitedValues, hasItem(input));
+        }
+        visitedNodes.add(node);
+      }
+
+      @Override
+      public void visitValue(PValue value, Node producer) {
+        assertThat(visitedNodes, hasItem(producer));
+        assertThat(visitedValues, not(hasItem(value)));
+        visitedValues.add(value);
+      }
+    });
+  }
 }


[2/4] beam git commit: Update Apex Overrides

Posted by tg...@apache.org.
Update Apex Overrides

Only override CreatePCollectionView transforms


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8eb09aad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8eb09aad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8eb09aad

Branch: refs/heads/master
Commit: 8eb09aad9c975f787ba8afac83394cc8b56eb94f
Parents: bd1dfdf
Author: Thomas Groh <tg...@google.com>
Authored: Thu May 25 10:41:56 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 07:50:37 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 119 ++++---------------
 1 file changed, 21 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8eb09aad/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index f91d8e5..c595b3f 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -57,7 +57,6 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverride;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -65,7 +64,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.ParDo.MultiOutput;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.View.AsIterable;
-import org.apache.beam.sdk.transforms.View.AsSingleton;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -111,16 +110,12 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
                 new PrimitiveCreate.Factory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsSingleton.class),
-                new StreamingViewAsSingleton.Factory()))
-        .add(
-            PTransformOverride.of(
-                PTransformMatchers.classEqualTo(View.AsIterable.class),
+                PTransformMatchers.createViewWithViewFn(PCollectionViews.IterableViewFn.class),
                 new StreamingViewAsIterable.Factory()))
         .add(
             PTransformOverride.of(
-                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-                new StreamingCombineGloballyAsSingletonView.Factory()))
+                PTransformMatchers.createViewWithViewFn(PCollectionViews.SingletonViewFn.class),
+                new StreamingWrapSingletonInList.Factory()))
         .add(
             PTransformOverride.of(
                 PTransformMatchers.splittableParDoMulti(),
@@ -245,117 +240,45 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     }
   }
 
-  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
-      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> {
+  private static class StreamingWrapSingletonInList<T>
+      extends PTransform<PCollection<T>, PCollectionView<T>> {
     private static final long serialVersionUID = 1L;
-    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+    CreatePCollectionView<T, T> transform;
 
     /**
      * Builds an instance of this class from the overridden transform.
      */
-    private StreamingCombineGloballyAsSingletonView(
-        Combine.GloballyAsSingletonView<InputT, OutputT> transform) {
+    private StreamingWrapSingletonInList(
+        CreatePCollectionView<T, T> transform) {
       this.transform = transform;
     }
 
     @Override
-    public PCollectionView<OutputT> expand(PCollection<InputT> input) {
-      PCollection<OutputT> combined = input
-          .apply(Combine.globally(transform.getCombineFn())
-              .withoutDefaults().withFanout(transform.getFanout()));
-
-      PCollectionView<OutputT> view = PCollectionViews.singletonView(combined,
-          combined.getWindowingStrategy(), transform.getInsertDefault(),
-          transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null,
-              combined.getCoder());
-      return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
-          .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
+    public PCollectionView<T> expand(PCollection<T> input) {
+      return input
+          .apply(ParDo.of(new WrapAsList<T>()))
+          .apply(CreateApexPCollectionView.<T, T>of(transform.getView()));
     }
 
     @Override
     protected String getKindString() {
-      return "StreamingCombineGloballyAsSingletonView";
+      return "StreamingWrapSingletonInList";
     }
 
-    static class Factory<InputT, OutputT>
+    static class Factory<T>
         extends SingleInputOutputOverrideFactory<
-            PCollection<InputT>, PCollectionView<OutputT>,
-            Combine.GloballyAsSingletonView<InputT, OutputT>> {
+            PCollection<T>, PCollectionView<T>,
+            CreatePCollectionView<T, T>> {
       @Override
-      public PTransformReplacement<PCollection<InputT>, PCollectionView<OutputT>>
+      public PTransformReplacement<PCollection<T>, PCollectionView<T>>
           getReplacementTransform(
               AppliedPTransform<
-                      PCollection<InputT>, PCollectionView<OutputT>,
-                      GloballyAsSingletonView<InputT, OutputT>>
+                      PCollection<T>, PCollectionView<T>,
+                      CreatePCollectionView<T, T>>
                   transform) {
         return PTransformReplacement.of(
             PTransformReplacements.getSingletonMainInput(transform),
-            new StreamingCombineGloballyAsSingletonView<>(transform.getTransform()));
-      }
-    }
-  }
-
-  private static class StreamingViewAsSingleton<T>
-      extends PTransform<PCollection<T>, PCollectionView<T>> {
-    private static final long serialVersionUID = 1L;
-
-    private View.AsSingleton<T> transform;
-
-    public StreamingViewAsSingleton(View.AsSingleton<T> transform) {
-      this.transform = transform;
-    }
-
-    @Override
-    public PCollectionView<T> expand(PCollection<T> input) {
-      Combine.Globally<T, T> combine = Combine
-          .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue()));
-      if (!transform.hasDefaultValue()) {
-        combine = combine.withoutDefaults();
-      }
-      return input.apply(combine.asSingletonView());
-    }
-
-    @Override
-    protected String getKindString() {
-      return "StreamingViewAsSingleton";
-    }
-
-    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> {
-      private boolean hasDefaultValue;
-      private T defaultValue;
-
-      SingletonCombine(boolean hasDefaultValue, T defaultValue) {
-        this.hasDefaultValue = hasDefaultValue;
-        this.defaultValue = defaultValue;
-      }
-
-      @Override
-      public T apply(T left, T right) {
-        throw new IllegalArgumentException("PCollection with more than one element "
-            + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to "
-            + "combine the PCollection into a single value");
-      }
-
-      @Override
-      public T identity() {
-        if (hasDefaultValue) {
-          return defaultValue;
-        } else {
-          throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. "
-              + "Consider setting withDefault to provide a default value");
-        }
-      }
-    }
-
-    static class Factory<T>
-        extends SingleInputOutputOverrideFactory<
-            PCollection<T>, PCollectionView<T>, View.AsSingleton<T>> {
-      @Override
-      public PTransformReplacement<PCollection<T>, PCollectionView<T>> getReplacementTransform(
-          AppliedPTransform<PCollection<T>, PCollectionView<T>, AsSingleton<T>> transform) {
-        return PTransformReplacement.of(
-            PTransformReplacements.getSingletonMainInput(transform),
-            new StreamingViewAsSingleton<>(transform.getTransform()));
+            new StreamingWrapSingletonInList<>(transform.getTransform()));
       }
     }
   }


[4/4] beam git commit: This closes #3211

Posted by tg...@apache.org.
This closes #3211


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6bb204f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6bb204f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6bb204f3

Branch: refs/heads/master
Commit: 6bb204f34629d7ece10a28a509fa9194df73d21f
Parents: 7568f02 2e2ae9c
Author: Thomas Groh <tg...@google.com>
Authored: Fri May 26 09:23:31 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri May 26 09:23:31 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 119 +++----------
 .../apex/translation/TranslationContext.java    |   4 +-
 .../core/construction/TransformInputs.java      |  50 ++++++
 .../core/construction/TransformInputsTest.java  | 166 +++++++++++++++++++
 .../beam/runners/direct/DirectGraphVisitor.java |  15 +-
 .../runners/direct/ParDoEvaluatorFactory.java   |   9 +-
 ...littableProcessElementsEvaluatorFactory.java |   2 +
 .../direct/StatefulParDoEvaluatorFactory.java   |   1 +
 .../beam/runners/direct/WatermarkManager.java   |  17 +-
 .../beam/runners/direct/ParDoEvaluatorTest.java |   6 +-
 .../flink/FlinkBatchTranslationContext.java     |   3 +-
 .../flink/FlinkStreamingTranslationContext.java |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   5 +-
 .../apache/beam/runners/spark/SparkRunner.java  |  13 ++
 .../spark/translation/BoundedDataset.java       |   6 +
 .../spark/translation/EvaluationContext.java    |   4 +-
 .../spark/translation/TransformTranslator.java  |   1 +
 .../spark/translation/StorageLevelTest.java     |   4 +-
 .../beam/sdk/runners/TransformHierarchy.java    |  74 +++++++--
 .../sdk/runners/TransformHierarchyTest.java     |  86 ++++++++++
 20 files changed, 455 insertions(+), 133 deletions(-)
----------------------------------------------------------------------