You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2017/05/04 17:17:40 UTC
[1/2] beam git commit: Remove POutput#recordAsOutput
Repository: beam
Updated Branches:
refs/heads/master 6a95e5eec -> e3a0e26e2
Remove POutput#recordAsOutput
Add PValue#setDefaultName, for composite POutputs to use when they are
being finalized.
Expand POutput#finishSpecifyingOutput to take the name of the transform
that is being finished.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ab8f92fc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ab8f92fc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ab8f92fc
Branch: refs/heads/master
Commit: ab8f92fcc272e7eca70720fb91df584769376ecc
Parents: 6a95e5e
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 2 17:53:57 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 10:16:19 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/runners/TransformHierarchy.java | 8 ++--
.../org/apache/beam/sdk/values/PCollection.java | 4 +-
.../apache/beam/sdk/values/PCollectionList.java | 14 +++----
.../beam/sdk/values/PCollectionTuple.java | 17 ++++----
.../java/org/apache/beam/sdk/values/PDone.java | 12 ++----
.../org/apache/beam/sdk/values/POutput.java | 25 ++---------
.../org/apache/beam/sdk/values/PValueBase.java | 44 ++++++--------------
.../sdk/runners/TransformHierarchyTest.java | 3 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 4 +-
.../beam/sdk/io/gcp/bigquery/WriteResult.java | 21 +---------
10 files changed, 47 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index 9236194..703aeb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -171,14 +171,14 @@ public class TransformHierarchy {
for (PValue value : output.expand().values()) {
if (!producers.containsKey(value)) {
producers.put(value, current);
+ value.finishSpecifyingOutput(
+ current.getFullName(), unexpandedInputs.get(current), current.transform);
}
- value.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
producerInput.put(value, unexpandedInputs.get(current));
}
- output.finishSpecifyingOutput(unexpandedInputs.get(current), current.transform);
+ output.finishSpecifyingOutput(
+ current.getFullName(), unexpandedInputs.get(current), current.transform);
current.setOutput(output);
- // TODO: Replace with a "generateDefaultNames" method.
- output.recordAsOutput(current.toAppliedPTransform());
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 20e5d68..1622322 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -83,9 +83,9 @@ public class PCollection<T> extends PValueBase implements PValue {
@Override
public void finishSpecifyingOutput(
- PInput input, PTransform<?, ?> transform) {
+ String transformName, PInput input, PTransform<?, ?> transform) {
this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
- super.finishSpecifyingOutput(input, transform);
+ super.finishSpecifyingOutput(transformName, input, transform);
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
index 7b45deb..48c3649 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionList.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Partition;
@@ -230,22 +229,21 @@ public class PCollectionList<T> implements PInput, POutput {
}
@Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {
+ // All component PCollections will have already been finished.
int i = 0;
for (TaggedPValue tpv : pcollections) {
@SuppressWarnings("unchecked")
PCollection<T> pc = (PCollection<T>) tpv.getValue();
- pc.recordAsOutput(transform, "out" + i);
+ if (pc.getName().equals(PValueBase.defaultName(transformName))) {
+ pc.setName(String.format("%s.%s%s", transformName, "out", i));
+ }
i++;
}
}
@Override
- public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
- // All component PCollections will have already been finished.
- }
-
- @Override
public boolean equals(Object other) {
if (!(other instanceof PCollectionList)) {
return false;
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
index ce67e94..5027df6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionTuple.java
@@ -23,7 +23,6 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -236,23 +235,23 @@ public class PCollectionTuple implements PInput, POutput {
}
@Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {
+ // All component PCollections will already have been finished. Update their names if
+ // appropriate.
int i = 0;
for (Map.Entry<TupleTag<?>, PCollection<?>> entry
- : pcollectionMap.entrySet()) {
+ : pcollectionMap.entrySet()) {
TupleTag<?> tag = entry.getKey();
PCollection<?> pc = entry.getValue();
- pc.recordAsOutput(transform, tag.getOutName(i));
+ if (pc.getName().equals(PValueBase.defaultName(transformName))) {
+ pc.setName(String.format("%s.%s", transformName, tag.getOutName(i)));
+ }
i++;
}
}
@Override
- public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
- // All component PCollections will already have been finished
- }
-
- @Override
public boolean equals(Object other) {
if (!(other instanceof PCollectionTuple)) {
return false;
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
index 5c9800d..92473b9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PDone.java
@@ -21,7 +21,6 @@ import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.WriteFiles;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
/**
@@ -56,13 +55,8 @@ public class PDone implements POutput {
return Collections.emptyMap();
}
- /** Does nothing; there are no concrete outputs to record. */
+ /** Does nothing; there is nothing to finish specifying. */
@Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {}
-
- /**
- * Does nothing; there is nothing to finish specifying.
- */
- @Override
- public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
index bb01beb..c6d15e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/POutput.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.values;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
/**
@@ -48,31 +47,15 @@ public interface POutput {
Map<TupleTag<?>, PValue> expand();
/**
- * Records that this {@code POutput} is an output of the given
- * {@code PTransform}.
- *
- * <p>For a compound {@code POutput}, it is advised to call
- * this method on each component {@code POutput}.
- *
- * <p>This is not intended to be invoked by user code, but
- * is automatically invoked as part of applying the
- * producing {@link PTransform}.
- */
- void recordAsOutput(AppliedPTransform<?, ?, ?> transform);
-
- /**
* As part of applying the producing {@link PTransform}, finalizes this output to make it ready
* for being used as an input and for running.
*
* <p>This includes ensuring that all {@link PCollection PCollections} have {@link
* org.apache.beam.sdk.coders.Coder Coders} specified or defaulted.
*
- * <p>Automatically invoked whenever this {@link POutput} is output, after {@link
- * PValue#finishSpecifyingOutput(PInput, PTransform)} has been called on each component {@link
- * PValue} returned by {@link #expand()}.
- *
- * @deprecated see BEAM-1199
+ * <p>Automatically invoked whenever this {@link POutput} is output, after
+ * {@link PValue#finishSpecifyingOutput(String, PInput, PTransform)} has been called on each
+ * component {@link PValue} returned by {@link #expand()}.
*/
- @Deprecated
- void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform);
+ void finishSpecifyingOutput(String transformName, PInput input, PTransform<?, ?> transform);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 7ab5808..4de0589 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -17,10 +17,11 @@
*/
package org.apache.beam.sdk.values;
+import static com.google.common.base.Preconditions.checkState;
+
import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NameUtils;
@@ -65,10 +66,7 @@ public abstract class PValueBase implements PValue {
* already been finalized and may no longer be set.
*/
public PValueBase setName(String name) {
- if (finishedSpecifying) {
- throw new IllegalStateException(
- "cannot change the name of " + this + " once it's been used");
- }
+ checkState(!finishedSpecifying, "cannot change the name of %s once it's been used", this);
this.name = name;
return this;
}
@@ -104,26 +102,6 @@ public abstract class PValueBase implements PValue {
*/
private boolean finishedSpecifying = false;
- @Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {
- recordAsOutput(transform, "out");
- }
-
- /**
- * Records that this {@link PValueBase} is an output with the
- * given name of the given {@link AppliedPTransform} in the given
- * {@link Pipeline}.
- *
- * <p>To be invoked only by {@link POutput#recordAsOutput}
- * implementations. Not to be invoked directly by user code.
- */
- protected void recordAsOutput(AppliedPTransform<?, ?, ?> transform,
- String outName) {
- if (name == null) {
- name = transform.getFullName() + "." + outName;
- }
- }
-
/**
* Returns whether this {@link PValueBase} has been finalized, and
* its core properties, e.g., name, can no longer be changed.
@@ -165,11 +143,15 @@ public abstract class PValueBase implements PValue {
return pipeline;
}
- /**
- * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is
- * to do nothing. Override if your {@link PValue} requires
- * finalization.
- */
@Override
- public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {
+ if (name == null) {
+ setName(defaultName(transformName));
+ }
+ }
+
+ static String defaultName(String transformName) {
+ return String.format("%s.%s", transformName, "out");
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
index 1b884e2..e495758 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformHierarchyTest.java
@@ -145,7 +145,8 @@ public class TransformHierarchyTest implements Serializable {
final PCollectionList<Long> appended =
pcList.and(
PCollection.<Long>createPrimitiveOutputInternal(
- pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED));
+ pipeline, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)
+ .setName("prim"));
hierarchy.pushNode(
"AddPc",
pcList,
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 26904aa..d4475c9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -1244,7 +1244,9 @@ public class ParDoTest implements Serializable {
outputTuple.get(additionalOutputTag).apply(View.<TestDummy>asSingleton());
assertEquals(new TestDummyCoder(), outputTuple.get(additionalOutputTag).getCoder());
- outputTuple.get(additionalOutputTag).finishSpecifyingOutput(input, pardo); // Check for crashes
+ outputTuple
+ .get(additionalOutputTag)
+ .finishSpecifyingOutput("ParDo", input, pardo); // Check for crashes
assertEquals(new TestDummyCoder(),
outputTuple.get(additionalOutputTag).getCoder()); // Check for corruption
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ab8f92fc/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
index d137f05..bc18e8e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.gcp.bigquery;
import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -55,23 +54,7 @@ final class WriteResult implements POutput {
return pipeline;
}
- /**
- * Records that this {@link WriteResult} is an output with the given name of the given {@link
- * AppliedPTransform}.
- *
- * <p>By default, does nothing.
- *
- * <p>To be invoked only by {@link POutput#recordAsOutput} implementations. Not to be invoked
- * directly by user code.
- */
- @Override
- public void recordAsOutput(AppliedPTransform<?, ?, ?> transform) {}
-
- /**
- * Default behavior for {@link #finishSpecifyingOutput(PInput, PTransform)}} is
- * to do nothing. Override if your {@link PValue} requires
- * finalization.
- */
@Override
- public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) { }
+ public void finishSpecifyingOutput(
+ String transformName, PInput input, PTransform<?, ?> transform) {}
}
[2/2] beam git commit: [BEAM-1199] Remove POutput#recordAsOutput
Posted by lc...@apache.org.
[BEAM-1199] Remove POutput#recordAsOutput
This closes #2864
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e3a0e26e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e3a0e26e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e3a0e26e
Branch: refs/heads/master
Commit: e3a0e26e25d8409d61e16ea64bb580e02a2b036e
Parents: 6a95e5e ab8f92f
Author: Luke Cwik <lc...@google.com>
Authored: Thu May 4 10:17:31 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Thu May 4 10:17:31 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/runners/TransformHierarchy.java | 8 ++--
.../org/apache/beam/sdk/values/PCollection.java | 4 +-
.../apache/beam/sdk/values/PCollectionList.java | 14 +++----
.../beam/sdk/values/PCollectionTuple.java | 17 ++++----
.../java/org/apache/beam/sdk/values/PDone.java | 12 ++----
.../org/apache/beam/sdk/values/POutput.java | 25 ++---------
.../org/apache/beam/sdk/values/PValueBase.java | 44 ++++++--------------
.../sdk/runners/TransformHierarchyTest.java | 3 +-
.../apache/beam/sdk/transforms/ParDoTest.java | 4 +-
.../beam/sdk/io/gcp/bigquery/WriteResult.java | 21 +---------
10 files changed, 47 insertions(+), 105 deletions(-)
----------------------------------------------------------------------