You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/16 02:13:00 UTC

[jira] [Commented] (BEAM-1138) Consider merging KeyedPValueTrackingVisitor with DirectGraphVisitor

    [ https://issues.apache.org/jira/browse/BEAM-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16293566#comment-16293566 ] 

ASF GitHub Bot commented on BEAM-1138:
--------------------------------------

jkff closed pull request #2951: [BEAM-1138] merging KeyedPValueTrackingVisitor with DirectGraphVisitor
URL: https://github.com/apache/beam/pull/2951
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
index 1ee8ceb9a7e..ed95f4d5a2a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java
@@ -20,20 +20,27 @@
 import static com.google.common.base.Preconditions.checkState;
 
 import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ListMultimap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+
+import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the
@@ -41,6 +48,11 @@
  * input after the upstream transform has produced and committed output.
  */
 class DirectGraphVisitor extends PipelineVisitor.Defaults {
+
+  private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS = ImmutableSet.of(
+      SplittableParDo.GBKIntoKeyedWorkItems.class, DirectGroupByKeyOnly.class,
+      DirectGroupAlsoByWindow.class);
+
   private Map<POutput, AppliedPTransform<?, ?, ?>> producers = new HashMap<>();
 
   private ListMultimap<PInput, AppliedPTransform<?, ?, ?>> primitiveConsumers =
@@ -49,6 +61,8 @@
   private Set<PCollectionView<?>> views = new HashSet<>();
   private Set<AppliedPTransform<?, ?, ?>> rootTransforms = new HashSet<>();
   private Map<AppliedPTransform<?, ?, ?>, String> stepNames = new HashMap<>();
+  private Set<PValue> keyedValues = new HashSet<>();
+
   private int numTransforms = 0;
   private boolean finalized = false;
 
@@ -72,6 +86,11 @@ public void leaveCompositeTransform(TransformHierarchy.Node node) {
         getClass().getSimpleName());
     if (node.isRootNode()) {
       finalized = true;
+    } else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) {
+      Map<TupleTag<?>, PValue> outputs = node.getOutputs();
+      for (PValue output : outputs.values()) {
+        keyedValues.add(output);
+      }
     }
   }
 
@@ -97,6 +116,14 @@ public void visitValue(PValue value, TransformHierarchy.Node producer) {
     if (!producers.containsKey(value)) {
       producers.put(value, appliedTransform);
     }
+    boolean inputsAreKeyed = true;
+    for (PValue input : producer.getInputs().values()) {
+      inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input);
+    }
+    if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass())
+        || (isKeyPreserving(producer.getTransform()) && inputsAreKeyed)) {
+      keyedValues.add(value);
+    }
   }
 
   private AppliedPTransform<?, ?, ?> getAppliedTransform(TransformHierarchy.Node node) {
@@ -113,8 +140,38 @@ private String genStepName() {
    * Get the graph constructed by this {@link DirectGraphVisitor}, which provides
    * lookups for producers and consumers of {@link PValue PValues}.
    */
-  public DirectGraph getGraph() {
+  DirectGraph getGraph() {
     checkState(finalized, "Can't get a graph before the Pipeline has been completely traversed");
     return DirectGraph.create(producers, primitiveConsumers, views, rootTransforms, stepNames);
   }
+
+  /**
+   * Get all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
+   * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that
+   * produces keyed outputs is assumed to colocate output elements that share a key.
+   *
+   * <p>All {@link GroupByKey} transforms, or their runner-specific implementation primitive,
+   * produce keyed output.
+   * TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
+   * unkeyed
+   */
+  Set<PValue> getKeyedPValues() {
+    checkState(finalized,
+        "can't call getKeyedPValues before a Pipeline has been completely traversed");
+    return keyedValues;
+  }
+
+  private static boolean isKeyPreserving(PTransform<?, ?> transform) {
+    // This is a hacky check for what is considered key-preserving to the direct runner.
+    // The most obvious alternative would be a package-private marker interface, but
+    // better to make this obviously hacky so it is less likely to proliferate. Meanwhile
+    // we intend to allow explicit expression of key-preserving DoFn in the model.
+    if (transform instanceof ParDo.MultiOutput) {
+      ParDo.MultiOutput<?, ?> parDo = (ParDo.MultiOutput<?, ?>) transform;
+      return parDo.getFn() instanceof ParDoMultiOverrideFactory.ToKeyedWorkItem;
+    } else {
+      return false;
+    }
+  }
+
 }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index b0ce5eb0284..fe64d482bed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -159,7 +159,7 @@ public DirectPipelineResult run(Pipeline pipeline) {
     pipeline.traverseTopologically(graphVisitor);
 
     @SuppressWarnings("rawtypes")
-    KeyedPValueTrackingVisitor keyedPValueVisitor = KeyedPValueTrackingVisitor.create();
+    DirectGraphVisitor keyedPValueVisitor = new DirectGraphVisitor();
     pipeline.traverseTopologically(keyedPValueVisitor);
 
     DisplayDataValidator.validatePipeline(pipeline);
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
deleted file mode 100644
index f9b6eba215f..00000000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.ImmutableSet;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.runners.core.SplittableParDo;
-import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
-import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.runners.TransformHierarchy;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
- * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that
- * produces keyed outputs is assumed to colocate output elements that share a key.
- *
- * <p>All {@link GroupByKey} transforms, or their runner-specific implementation primitive, produce
- * keyed output.
- */
-// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
-// unkeyed
-class KeyedPValueTrackingVisitor implements PipelineVisitor {
-
-  private static final Set<Class<? extends PTransform>> PRODUCES_KEYED_OUTPUTS =
-      ImmutableSet.of(
-          SplittableParDo.GBKIntoKeyedWorkItems.class,
-          DirectGroupByKeyOnly.class,
-          DirectGroupAlsoByWindow.class);
-
-  private final Set<PValue> keyedValues;
-  private boolean finalized;
-
-  public static KeyedPValueTrackingVisitor create() {
-    return new KeyedPValueTrackingVisitor();
-  }
-
-  private KeyedPValueTrackingVisitor() {
-    this.keyedValues = new HashSet<>();
-  }
-
-  @Override
-  public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
-    checkState(
-        !finalized,
-        "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
-        KeyedPValueTrackingVisitor.class.getSimpleName(),
-        node);
-    return CompositeBehavior.ENTER_TRANSFORM;
-  }
-
-  @Override
-  public void leaveCompositeTransform(TransformHierarchy.Node node) {
-    checkState(
-        !finalized,
-        "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
-        KeyedPValueTrackingVisitor.class.getSimpleName(),
-        node);
-    if (node.isRootNode()) {
-      finalized = true;
-    } else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) {
-      Map<TupleTag<?>, PValue> outputs = node.getOutputs();
-      for (PValue output : outputs.values()) {
-        keyedValues.add(output);
-      }
-    }
-  }
-
-  @Override
-  public void visitPrimitiveTransform(TransformHierarchy.Node node) {}
-
-  @Override
-  public void visitValue(PValue value, TransformHierarchy.Node producer) {
-    boolean inputsAreKeyed = true;
-    for (PValue input : producer.getInputs().values()) {
-      inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input);
-    }
-    if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass())
-        || (isKeyPreserving(producer.getTransform()) && inputsAreKeyed)) {
-      keyedValues.add(value);
-    }
-  }
-
-  public Set<PValue> getKeyedPValues() {
-    checkState(
-        finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed");
-    return keyedValues;
-  }
-
-  private static boolean isKeyPreserving(PTransform<?, ?> transform) {
-    // This is a hacky check for what is considered key-preserving to the direct runner.
-    // The most obvious alternative would be a package-private marker interface, but
-    // better to make this obviously hacky so it is less likely to proliferate. Meanwhile
-    // we intend to allow explicit expression of key-preserving DoFn in the model.
-    if (transform instanceof ParDo.MultiOutput) {
-      ParDo.MultiOutput<?, ?> parDo = (ParDo.MultiOutput<?, ?>) transform;
-      return parDo.getFn() instanceof ParDoMultiOverrideFactory.ToKeyedWorkItem;
-    } else {
-      return false;
-    }
-  }
-}
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
index 7f46a0e4c9c..4c148063708 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectGraphVisitorTest.java
@@ -26,9 +26,7 @@
 import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.CountingSource;
 import org.apache.beam.sdk.io.GenerateSequence;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -56,6 +54,7 @@
  */
 @RunWith(JUnit4.class)
 public class DirectGraphVisitorTest implements Serializable {
+
   @Rule public transient ExpectedException thrown = ExpectedException.none();
   @Rule public transient TestPipeline p = TestPipeline.create()
                                                       .enableAbandonedNodeEnforcement(false);
@@ -87,7 +86,7 @@ public void processElement(DoFn<String, String>.ProcessContext c)
   @Test
   public void getRootTransformsContainsRootTransforms() {
     PCollection<String> created = p.apply(Create.of("foo", "bar"));
-    PCollection<Long> counted = p.apply(Read.from(CountingSource.upTo(1234L)));
+    PCollection<Long> counted = p.apply(GenerateSequence.from(0).to(1234L));
     PCollection<Long> unCounted = p.apply(GenerateSequence.from(0));
     p.traverseTopologically(visitor);
     DirectGraph graph = visitor.getGraph();
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
index 72b1bbcba18..a9581b960cf 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java
@@ -101,8 +101,8 @@ public void setup() {
     view = created.apply(View.<Integer>asIterable());
     unbounded = p.apply(GenerateSequence.from(0));
 
-    KeyedPValueTrackingVisitor keyedPValueTrackingVisitor = KeyedPValueTrackingVisitor.create();
-    p.traverseTopologically(keyedPValueTrackingVisitor);
+    DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
+    p.traverseTopologically(graphVisitor);
 
     BundleFactory bundleFactory = ImmutableListBundleFactory.create();
     graph = DirectGraphs.getGraph(p);
@@ -112,7 +112,7 @@ public void setup() {
             NanosOffsetClock.create(),
             bundleFactory,
             graph,
-            keyedPValueTrackingVisitor.getKeyedPValues());
+            graphVisitor.getKeyedPValues());
 
     createdProducer = graph.getProducer(created);
     downstreamProducer = graph.getProducer(downstream);
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
index 641575742cb..52488e6b83c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitorTest.java
@@ -54,19 +54,21 @@
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-/** Tests for {@link KeyedPValueTrackingVisitor}. */
+/** Tests for {@link DirectGraphVisitor#getKeyedPValues()}. */
 @RunWith(JUnit4.class)
 public class KeyedPValueTrackingVisitorTest {
+
   @Rule public ExpectedException thrown = ExpectedException.none();
 
-  private KeyedPValueTrackingVisitor visitor;
+  private transient DirectGraphVisitor visitor;
+
   @Rule
   public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
   @Before
   public void setup() {
     p = TestPipeline.create();
-    visitor = KeyedPValueTrackingVisitor.create();
+    visitor = new DirectGraphVisitor();
   }
 
   @Test
@@ -185,8 +187,8 @@ public void traverseMultipleTimesThrows() {
 
     p.traverseTopologically(visitor);
     thrown.expect(IllegalStateException.class);
-    thrown.expectMessage("already been finalized");
-    thrown.expectMessage(KeyedPValueTrackingVisitor.class.getSimpleName());
+    thrown.expectMessage(DirectGraphVisitor.class.getSimpleName());
+    thrown.expectMessage("is finalized");
     p.traverseTopologically(visitor);
   }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Consider merging KeyedPValueTrackingVisitor with DirectGraphVisitor
> -------------------------------------------------------------------
>
>                 Key: BEAM-1138
>                 URL: https://issues.apache.org/jira/browse/BEAM-1138
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-direct
>            Reporter: Kenneth Knowles
>            Assignee: Borisa Zivkovic
>            Priority: Minor
>              Labels: newbie, starter
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)