You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 17:17:40 UTC

[1/2] beam git commit: Remove POutput#recordAsOutput

Repository: beam
Updated Branches:
  refs/heads/master 6a95e5eec -> e3a0e26e2


Remove POutput#recordAsOutput

Add PValue#setDefaultName, for composite POutputs to use when they are
being finalized.

Expand POutput#finishSpecifyingOutput to take the name of the transform
that is being finished.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab8f92fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab8f92fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab8f92fc

Branch: refs/heads/master
Commit: ab8f92fcc272e7eca70720fb91df584769376ecc
Parents: 6a95e5e
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 2 17:53:57 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 10:16:19 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/runners/TransformHierarchy.java    |  8 ++--
 .../org/apache/beam/sdk/values/PCollection.java |  4 +-
 .../apache/beam/sdk/values/PCollectionList.java | 14 +++----
 .../beam/sdk/values/PCollectionTuple.java       | 17 ++++----
 .../java/org/apache/beam/sdk/values/PDone.java  | 12 ++----
 .../org/apache/beam/sdk/values/POutput.java     | 25 ++---------
 .../org/apache/beam/sdk/values/PValueBase.java  | 44 ++++++--------------
 .../sdk/runners/TransformHierarchyTest.java     |  3 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   | 21 +---------
 10 files changed, 47 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 9236194..703aeb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -171,14 +171,14 @@ public class TransformHierarchy {
     for (PValue value : output.expand().values()) {
       if (!producers.containsKey(value)) {
         producers.put(value, current);
+        value.finishSpecifyingOutput(
+            current.getFullName(), unexpandedInputs.get(current), current.transform);
       }
-      value.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
       producerInput.put(value, unexpandedInputs.get(current));
     }
-    output.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
+    output.finishSpecifyingOutput(
+        current.getFullName(), unexpandedInputs.get(current), current.transform);
     current.setOutput(output);
-    // TODO: Replace with a "generateDefaultNames" method.
-    output.recordAsOutput(current.toAppliedPTransform());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 20e5d68..1622322 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -83,9 +83,9 @@ public class PCollection<T> extends PValueBase implements PValue {
 
   @Override
   public void finishSpecifyingOutput(
-      PInput input, PTransform<?, ?> transform) {
+      String transformName, PInput input, PTransform<?, ?> transform) {
     this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
-    super.finishSpecifyingOutput(input, transform);
+    super.finishSpecifyingOutput(transformName, input, transform);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
index 7b45deb..48c3649 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Partition;
@@ -230,22 +229,21 @@ public class PCollectionList<T> implements PInput, POutput {
   }
 
   @Override
-  public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {
+    // All component PCollections will have already been finished.
     int i = 0;
     for (TaggedPValue tpv : pcollections) {
       @SuppressWarnings("unchecked")
       PCollection<T> pc = (PCollection<T>) tpv.getValue();
-      pc.recordAsOutput(transform, "out" + i);
+      if (pc.getName().equals(PValueBase.defaultName(transformName))) {
+        pc.setName(String.format("%s.%s%s", transformName, "out", i));
+      }
       i++;
     }
   }
 
   @Override
-  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
-    // All component PCollections will have already been finished.
-  }
-
-  @Override
   public boolean equals(Object other) {
     if (!(other instanceof PCollectionList)) {
       return false;

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index ce67e94..5027df6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -236,23 +235,23 @@ public class PCollectionTuple implements PInput, POutput {
   }
 
   @Override
-  public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {
+    // All component PCollections will already have been finished. Update their names if
+    // appropriate.
     int i = 0;
     for (Map.Entry<TupleTag<?>, PCollection<?>> entry
-             : pcollectionMap.entrySet()) {
+        : pcollectionMap.entrySet()) {
       TupleTag<?> tag = entry.getKey();
       PCollection<?> pc = entry.getValue();
-      pc.recordAsOutput(transform, tag.getOutName(i));
+      if (pc.getName().equals(PValueBase.defaultName(transformName))) {
+        pc.setName(String.format("%s.%s", transformName, tag.getOutName(i)));
+      }
       i++;
     }
   }
 
   @Override
-  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
-    // All component PCollections will already have been finished
-  }
-
-  @Override
   public boolean equals(Object other) {
     if (!(other instanceof PCollectionTuple)) {
       return false;

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
index 5c9800d..92473b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
@@ -21,7 +21,6 @@ import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -56,13 +55,8 @@ public class PDone implements POutput {
     return Collections.emptyMap();
   }
 
-  /** Does nothing; there are no concrete outputs to record. */
+  /** Does nothing; there is nothing to finish specifying. */
   @Override
-  public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {}
-
-  /**
-   * Does nothing; there is nothing to finish specifying.
-   */
-  @Override
-  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {}
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
index bb01beb..c6d15e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.values;
 
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 
 /**
@@ -48,31 +47,15 @@ public interface POutput {
   Map<TupleTag<?>, PValue> expand();
 
   /**
-   * Records that this {@code POutput} is an output of the given
-   * {@code PTransform}.
-   *
-   * <p>For a compound {@code POutput}, it is advised to call
-   * this method on each component {@code POutput}.
-   *
-   * <p>This is not intended to be invoked by user code, but
-   * is automatically invoked as part of applying the
-   * producing {@link PTransform}.
-   */
-  void recordAsOutput(AppliedPTransform<?, ?, ?> transform);
-
-  /**
    * As part of applying the producing {@link PTransform}, finalizes this output to make it ready
    * for being used as an input and for running.
    *
    * <p>This includes ensuring that all {@link PCollection PCollections} have {@link
    * org.apache.beam.sdk.coders.Coder Coders} specified or defaulted.
    *
-   * <p>Automatically invoked whenever this {@link POutput} is output, after {@link
-   * PValue#finishSpecifyingOutput(PInput, PTransform)} has been called on each component {@link
-   * PValue} returned by {@link #expand()}.
-   *
-   * @deprecated see BEAM-1199
+   * <p>Automatically invoked whenever this {@link POutput} is output, after
+   * {@link PValue#finishSpecifyingOutput(String, PInput, PTransform)} has been called on each
+   * component {@link PValue} returned by {@link #expand()}.
    */
-  @Deprecated
-  void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform);
+  void finishSpecifyingOutput(String transformName, PInput input, PTransform<?, ?> transform);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 7ab5808..4de0589 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -17,10 +17,11 @@
  */
 package org.apache.beam.sdk.values;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.NameUtils;
 
@@ -65,10 +66,7 @@ public abstract class PValueBase implements PValue {
    * already been finalized and may no longer be set.
    */
   public PValueBase setName(String name) {
-    if (finishedSpecifying) {
-      throw new IllegalStateException(
-          "cannot change the name of " + this + " once it's been used");
-    }
+    checkState(!finishedSpecifying, "cannot change the name of %s once it's been used", this);
     this.name = name;
     return this;
   }
@@ -104,26 +102,6 @@ public abstract class PValueBase implements PValue {
    */
   private boolean finishedSpecifying = false;
 
-  @Override
-  public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
-    recordAsOutput(transform, "out");
-  }
-
-  /**
-   * Records that this {@link PValueBase} is an output with the
-   * given name of the given {@link AppliedPTransform} in the given
-   * {@link Pipeline}.
-   *
-   * <p>To be invoked only by {@link POutput#recordAsOutput}
-   * implementations.  Not to be invoked directly by user code.
-   */
-  protected void recordAsOutput(AppliedPTransform<?, ?, ?> transform,
-                                String outName) {
-    if (name == null) {
-      name = transform.getFullName() + "." + outName;
-    }
-  }
-
   /**
    * Returns whether this {@link PValueBase} has been finalized, and
    * its core properties, e.g., name, can no longer be changed.
@@ -165,11 +143,15 @@ public abstract class PValueBase implements PValue {
     return pipeline;
   }
 
-  /**
-   * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is
-   * to do nothing. Override if your {@link PValue} requires
-   * finalization.
-   */
   @Override
-  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {
+    if (name == null) {
+      setName(defaultName(transformName));
+    }
+  }
+
+  static String defaultName(String transformName) {
+    return String.format("%s.%s", transformName, "out");
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1b884e2..e495758 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -145,7 +145,8 @@ public class TransformHierarchyTest implements Serializable {
     final PCollectionList<Long> appended =
         pcList.and(
             PCollection.<Long>createPrimitiveOutputInternal(
-                pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED));
+                    pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+                .setName("prim"));
     hierarchy.pushNode(
         "AddPc",
         pcList,

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 26904aa..d4475c9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1244,7 +1244,9 @@ public class ParDoTest implements Serializable {
     outputTuple.get(additionalOutputTag).apply(View.<TestDummy>asSingleton());
 
     assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder());
-    outputTuple.get(additionalOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes
+    outputTuple
+        .get(additionalOutputTag)
+        .finishSpecifyingOutput("ParDo", input, pardo); // Check for crashes
     assertEquals(new TestDummyCoder(),
         outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index d137f05..bc18e8e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -55,23 +54,7 @@ final class WriteResult implements POutput {
     return pipeline;
   }
 
-  /**
-   * Records that this {@link WriteResult} is an output with the given name of the given {@link
-   * AppliedPTransform}.
-   *
-   * <p>By default, does nothing.
-   *
-   * <p>To be invoked only by {@link POutput#recordAsOutput} implementations. Not to be invoked
-   * directly by user code.
-   */
-  @Override
-  public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {}
-
-  /**
-   * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is
-   * to do nothing. Override if your {@link PValue} requires
-   * finalization.
-   */
   @Override
-  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
+  public void finishSpecifyingOutput(
+      String transformName, PInput input, PTransform<?, ?> transform) {}
 }


[2/2] beam git commit: [BEAM-1199] Remove POutput#recordAsOutput

Posted by lc...@apache.org.
[BEAM-1199] Remove POutput#recordAsOutput

This closes #2864


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3a0e26e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3a0e26e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3a0e26e

Branch: refs/heads/master
Commit: e3a0e26e25d8409d61e16ea64bb580e02a2b036e
Parents: 6a95e5e ab8f92f
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 10:17:31 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 10:17:31 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/runners/TransformHierarchy.java    |  8 ++--
 .../org/apache/beam/sdk/values/PCollection.java |  4 +-
 .../apache/beam/sdk/values/PCollectionList.java | 14 +++----
 .../beam/sdk/values/PCollectionTuple.java       | 17 ++++----
 .../java/org/apache/beam/sdk/values/PDone.java  | 12 ++----
 .../org/apache/beam/sdk/values/POutput.java     | 25 ++---------
 .../org/apache/beam/sdk/values/PValueBase.java  | 44 ++++++--------------
 .../sdk/runners/TransformHierarchyTest.java     |  3 +-
 .../apache/beam/sdk/transforms/ParDoTest.java   |  4 +-
 .../beam/sdk/io/gcp/bigquery/WriteResult.java   | 21 +---------
 10 files changed, 47 insertions(+), 105 deletions(-)
----------------------------------------------------------------------