You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 03:58:28 UTC
[1/2] beam git commit: Inline TypedPValue
Repository: beam
Updated Branches:
refs/heads/master 34d25f406 -> ff6bb3530
Inline TypedPValue
This has exactly one implementation, and this is not expected to change.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef27abdc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef27abdc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef27abdc
Branch: refs/heads/master
Commit: ef27abdca3010ba12fe0208925535762fde16d7c
Parents: 17f0843
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 3 13:42:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 3 18:18:57 2017 -0700
----------------------------------------------------------------------
.../core/construction/ForwardingPTransform.java | 6 +-
.../beam/runners/core/SplittableParDo.java | 3 +-
.../direct/ParDoMultiOverrideFactory.java | 3 +-
.../dataflow/DataflowPipelineTranslator.java | 5 +-
.../apache/beam/sdk/transforms/PTransform.java | 6 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 3 +-
.../org/apache/beam/sdk/values/PCollection.java | 145 ++++++++++++-
.../org/apache/beam/sdk/values/PValueBase.java | 2 +-
.../org/apache/beam/sdk/values/TypedPValue.java | 208 -------------------
9 files changed, 150 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index 2f427ad..ca25ba7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -22,9 +22,9 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TypedPValue;
/**
* A base class for implementing {@link PTransform} overrides, which behave identically to the
@@ -51,8 +51,8 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
}
@Override
- public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused")
- TypedPValue<T> output) throws CannotProvideCoderException {
+ public <T> Coder<T> getDefaultOutputCoder(InputT input, PCollection<T> output)
+ throws CannotProvideCoderException {
return delegate().getDefaultOutputCoder(input, output);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 94f5f35..ed065a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypedPValue;
import org.joda.time.Instant;
/**
@@ -273,7 +272,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public <T> Coder<T> getDefaultOutputCoder(
PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
input,
- TypedPValue<T> output)
+ PCollection<T> output)
throws CannotProvideCoderException {
// Similar logic to ParDo.MultiOutput.getDefaultOutputCoder.
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 322c995..b10d669 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypedPValue;
/**
* A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
@@ -184,7 +183,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
@Override
public <T> Coder<T> getDefaultOutputCoder(
- PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, TypedPValue<T> output)
+ PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, PCollection<T> output)
throws CannotProvideCoderException {
return underlyingParDo.getDefaultOutputCoder(originalInput, output);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 05edd28..69b4ecd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -97,7 +97,6 @@ import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypedPValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -599,8 +598,8 @@ public class DataflowPipelineTranslator {
@Override
public long addOutput(PValue value) {
Coder<?> coder;
- if (value instanceof TypedPValue) {
- coder = ((TypedPValue<?>) value).getCoder();
+ if (value instanceof PCollection) {
+ coder = ((PCollection<?>) value).getCoder();
if (value instanceof PCollection) {
// Wrap the PCollection element Coder inside a WindowedValueCoder.
coder = WindowedValue.getFullCoder(
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 4f651f2..15abd98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -29,11 +29,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypedPValue;
/**
* A {@code PTransform<InputT, OutputT>} is an operation that takes an
@@ -122,7 +122,7 @@ import org.apache.beam.sdk.values.TypedPValue;
* not known at run-time (e.g., due to Java's "erasure" of generic
* types) or there was no default Coder registered, then the Coder
* should be specified manually by calling
- * {@link org.apache.beam.sdk.values.TypedPValue#setCoder}
+ * {@link PCollection#setCoder}
* on the output PCollection. The Coder of every output
* PCollection must be determined one way or another
* before that output is used as an input to another PTransform, or
@@ -306,7 +306,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
* @throws CannotProvideCoderException if none can be inferred.
*/
public <T> Coder<T> getDefaultOutputCoder(
- InputT input, @SuppressWarnings("unused") TypedPValue<T> output)
+ InputT input, @SuppressWarnings("unused") PCollection<T> output)
throws CannotProvideCoderException {
@SuppressWarnings("unchecked")
Coder<T> defaultOutputCoder = (Coder<T>) getDefaultOutputCoder(input);
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 6137a7b..73d78c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypedPValue;
/**
* {@link ParDo} is the core element-wise transform in Apache Beam, invoking a user-specified
@@ -763,7 +762,7 @@ public class ParDo {
@Override
public <T> Coder<T> getDefaultOutputCoder(
- PCollection<? extends InputT> input, TypedPValue<T> output)
+ PCollection<? extends InputT> input, PCollection<T> output)
throws CannotProvideCoderException {
@SuppressWarnings("unchecked")
Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 67520ce..034f0de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -17,14 +17,22 @@
*/
package org.apache.beam.sdk.values;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -62,7 +70,119 @@ import org.apache.beam.sdk.util.WindowingStrategy;
*
* @param <T> the type of the elements of this {@link PCollection}
*/
-public class PCollection<T> extends TypedPValue<T> {
+public class PCollection<T> extends PValueBase implements PValue {
+
+ /**
+ * The {@link Coder} used by this {@link PCollection} to encode and decode the values stored in
+ * it, or null if not specified nor inferred yet.
+ */
+ private CoderOrFailure<T> coderOrFailure =
+ new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
+ private TypeDescriptor<T> typeDescriptor;
+
+ @Override
+ public void finishSpecifyingOutput(
+ PInput input, PTransform<?, ?> transform) {
+ this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
+ super.finishSpecifyingOutput(input, transform);
+ }
+
+ /**
+ * After building, finalizes this {@link PValue} to make it ready for
+ * running. Automatically invoked whenever the {@link PValue} is "used"
+ * (e.g., when apply() is called on it) and when the Pipeline is
+ * run (useful if this is a {@link PValue} with no consumers).
+ */
+ @Override
+ public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
+ if (isFinishedSpecifying()) {
+ return;
+ }
+ this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
+ // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
+ // this will throw an exception.
+ getCoder();
+ super.finishSpecifying(input, transform);
+ }
+
+ /**
+ * Returns a {@link TypeDescriptor TypeDescriptor<T>} with some reflective information
+ * about {@code T}, if possible. May return {@code null} if no information
+ * is available. Subclasses may override this to enable better
+ * {@code Coder} inference.
+ */
+ public TypeDescriptor<T> getTypeDescriptor() {
+ return typeDescriptor;
+ }
+
+ /**
+ * If the coder is not explicitly set, this sets the coder for this {@link PCollection} to the
+ * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this
+ * is null, but can and should be improved by subclasses.
+ */
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private CoderOrFailure<T> inferCoderOrFail(
+ PInput input, PTransform<?, ?> transform, CoderRegistry registry) {
+ // First option for a coder: use the Coder set on this PValue.
+ if (coderOrFailure.coder != null) {
+ return coderOrFailure;
+ }
+
+ // Second option for a coder: use the default Coder from the producing PTransform.
+ CannotProvideCoderException inputCoderException;
+ try {
+ return new CoderOrFailure<>(
+ ((PTransform) transform).getDefaultOutputCoder(input, this), null);
+ } catch (CannotProvideCoderException exc) {
+ inputCoderException = exc;
+ }
+
+ // Third option for a coder: Look in the coder registry.
+ TypeDescriptor<T> token = getTypeDescriptor();
+ CannotProvideCoderException inferFromTokenException = null;
+ if (token != null) {
+ try {
+ return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
+ } catch (CannotProvideCoderException exc) {
+ inferFromTokenException = exc;
+ // Attempt to detect when the token came from a TupleTag used for a ParDo output,
+ // and provide a better error message if so. Unfortunately, this information is not
+ // directly available from the TypeDescriptor, so infer based on the type of the PTransform
+ // and the error message itself.
+ if (transform instanceof ParDo.MultiOutput
+ && exc.getReason() == ReasonCode.TYPE_ERASURE) {
+ inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
+ + " If this error occurs for an output of the producing ParDo, verify that the "
+ + "TupleTag for this output is constructed with proper type information (see "
+ + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
+ }
+ }
+ }
+
+ // Build up the error message and list of causes.
+ StringBuilder messageBuilder = new StringBuilder()
+ .append("Unable to return a default Coder for ").append(this)
+ .append(". Correct one of the following root causes:");
+
+ // No exception, but give the user a message about .setCoder() has not been called.
+ messageBuilder.append("\n No Coder has been manually specified; ")
+ .append(" you may do so using .setCoder().");
+
+ if (inferFromTokenException != null) {
+ messageBuilder
+ .append("\n Inferring a Coder from the CoderRegistry failed: ")
+ .append(inferFromTokenException.getMessage());
+ }
+
+ if (inputCoderException != null) {
+ messageBuilder
+ .append("\n Using the default output Coder from the producing PTransform failed: ")
+ .append(inputCoderException.getMessage());
+ }
+
+ // Build and throw the exception.
+ return new CoderOrFailure<>(null, messageBuilder.toString());
+ }
/**
* The enumeration of cases for whether a {@link PCollection} is bounded.
@@ -126,9 +246,9 @@ public class PCollection<T> extends TypedPValue<T> {
* @throws IllegalStateException if the {@link Coder} hasn't been set, and
* couldn't be inferred.
*/
- @Override
public Coder<T> getCoder() {
- return super.getCoder();
+ checkState(coderOrFailure.coder != null, coderOrFailure.failure);
+ return coderOrFailure.coder;
}
/**
@@ -139,9 +259,11 @@ public class PCollection<T> extends TypedPValue<T> {
* been finalized and may no longer be set.
* Once {@link #apply} has been called, this will be the case.
*/
- @Override
public PCollection<T> setCoder(Coder<T> coder) {
- super.setCoder(coder);
+ checkState(
+ !isFinishedSpecifying(), "cannot change the Coder of %s once it's been used", this);
+ checkArgument(coder != null, "Cannot setCoder(null)");
+ this.coderOrFailure = new CoderOrFailure<>(coder, null);
return this;
}
@@ -202,9 +324,8 @@ public class PCollection<T> extends TypedPValue<T> {
* {@link PCollectionTuple}, {@link PCollectionList}, or {@code PTransform<?, PCollection<T>>},
* etc., to provide more detailed reflective information.
*/
- @Override
public PCollection<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor) {
- super.setTypeDescriptor(typeDescriptor);
+ this.typeDescriptor = typeDescriptor;
return this;
}
@@ -241,4 +362,14 @@ public class PCollection<T> extends TypedPValue<T> {
.setWindowingStrategyInternal(windowingStrategy)
.setIsBoundedInternal(isBounded);
}
+
+ private static class CoderOrFailure<T> {
+ @Nullable private final Coder<T> coder;
+ @Nullable private final String failure;
+
+ public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) {
+ this.coder = coder;
+ this.failure = failure;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 9f151ec..7ab5808 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -130,7 +130,7 @@ public abstract class PValueBase implements PValue {
*
* <p>For internal use only.
*/
- public boolean isFinishedSpecifyingInternal() {
+ boolean isFinishedSpecifying() {
return finishedSpecifying;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
deleted file mode 100644
index f473776..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.values;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-
-/**
- * A {@link TypedPValue TypedPValue<T>} is the abstract base class of things that
- * store some number of values of type {@code T}.
- *
- * <p>Because we know the type {@code T}, this is the layer of the inheritance hierarchy where
- * we store a coder for objects of type {@code T}.
- *
- * @param <T> the type of the values stored in this {@link TypedPValue}
- */
-public abstract class TypedPValue<T> extends PValueBase implements PValue {
-
- /**
- * Returns the {@link Coder} used by this {@link TypedPValue} to encode and decode
- * the values stored in it.
- *
- * @throws IllegalStateException if the {@link Coder} hasn't been set, and
- * couldn't be inferred.
- */
- public Coder<T> getCoder() {
- checkState(coderOrFailure.coder != null, coderOrFailure.failure);
- return coderOrFailure.coder;
- }
-
- /**
- * Sets the {@link Coder} used by this {@link TypedPValue} to encode and decode the
- * values stored in it. Returns {@code this}.
- *
- * @throws IllegalStateException if this {@link TypedPValue} has already
- * been finalized and is no longer settable, e.g., by having
- * {@code apply()} called on it
- */
- public TypedPValue<T> setCoder(Coder<T> coder) {
- checkState(
- !isFinishedSpecifyingInternal(), "cannot change the Coder of %s once it's been used", this);
- checkArgument(coder != null, "Cannot setCoder(null)");
- this.coderOrFailure = new CoderOrFailure<>(coder, null);
- return this;
- }
-
- @Override
- public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
- this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
- }
-
- /**
- * After building, finalizes this {@link PValue} to make it ready for
- * running. Automatically invoked whenever the {@link PValue} is "used"
- * (e.g., when apply() is called on it) and when the Pipeline is
- * run (useful if this is a {@link PValue} with no consumers).
- */
- @Override
- public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
- if (isFinishedSpecifyingInternal()) {
- return;
- }
- this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
- // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
- // this will throw an exception.
- getCoder();
- super.finishSpecifying(input, transform);
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // Internal details below here.
-
- /**
- * The {@link Coder} used by this {@link TypedPValue} to encode and decode the values stored in
- * it, or null if not specified nor inferred yet.
- */
- private CoderOrFailure<T> coderOrFailure =
- new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
-
- protected TypedPValue(Pipeline p) {
- super(p);
- }
-
- private TypeDescriptor<T> typeDescriptor;
-
- /**
- * Returns a {@link TypeDescriptor TypeDescriptor<T>} with some reflective information
- * about {@code T}, if possible. May return {@code null} if no information
- * is available. Subclasses may override this to enable better
- * {@code Coder} inference.
- */
- public TypeDescriptor<T> getTypeDescriptor() {
- return typeDescriptor;
- }
-
- /**
- * Sets the {@link TypeDescriptor TypeDescriptor<T>} associated with this class. Better
- * reflective type information will lead to better {@link Coder}
- * inference.
- */
- public TypedPValue<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor) {
- this.typeDescriptor = typeDescriptor;
- return this;
- }
-
- /**
- * If the coder is not explicitly set, this sets the coder for this {@link TypedPValue} to the
- * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this
- * is null, but can and should be improved by subclasses.
- */
- @SuppressWarnings({"unchecked", "rawtypes"})
- private CoderOrFailure<T> inferCoderOrFail(
- PInput input, PTransform<?, ?> transform, CoderRegistry registry) {
- // First option for a coder: use the Coder set on this PValue.
- if (coderOrFailure.coder != null) {
- return coderOrFailure;
- }
-
- // Second option for a coder: use the default Coder from the producing PTransform.
- CannotProvideCoderException inputCoderException;
- try {
- return new CoderOrFailure<>(
- ((PTransform) transform).getDefaultOutputCoder(input, this), null);
- } catch (CannotProvideCoderException exc) {
- inputCoderException = exc;
- }
-
- // Third option for a coder: Look in the coder registry.
- TypeDescriptor<T> token = getTypeDescriptor();
- CannotProvideCoderException inferFromTokenException = null;
- if (token != null) {
- try {
- return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
- } catch (CannotProvideCoderException exc) {
- inferFromTokenException = exc;
- // Attempt to detect when the token came from a TupleTag used for a ParDo output,
- // and provide a better error message if so. Unfortunately, this information is not
- // directly available from the TypeDescriptor, so infer based on the type of the PTransform
- // and the error message itself.
- if (transform instanceof ParDo.MultiOutput
- && exc.getReason() == ReasonCode.TYPE_ERASURE) {
- inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
- + " If this error occurs for an output of the producing ParDo, verify that the "
- + "TupleTag for this output is constructed with proper type information (see "
- + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
- }
- }
- }
-
- // Build up the error message and list of causes.
- StringBuilder messageBuilder = new StringBuilder()
- .append("Unable to return a default Coder for ").append(this)
- .append(". Correct one of the following root causes:");
-
- // No exception, but give the user a message about .setCoder() has not been called.
- messageBuilder.append("\n No Coder has been manually specified; ")
- .append(" you may do so using .setCoder().");
-
- if (inferFromTokenException != null) {
- messageBuilder
- .append("\n Inferring a Coder from the CoderRegistry failed: ")
- .append(inferFromTokenException.getMessage());
- }
-
- if (inputCoderException != null) {
- messageBuilder
- .append("\n Using the default output Coder from the producing PTransform failed: ")
- .append(inputCoderException.getMessage());
- }
-
- // Build and throw the exception.
- return new CoderOrFailure<>(null, messageBuilder.toString());
- }
-
- private static class CoderOrFailure<T> {
- @Nullable private final Coder<T> coder;
- @Nullable private final String failure;
-
- public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) {
- this.coder = coder;
- this.failure = failure;
- }
- }
-}
[2/2] beam git commit: This closes #2867: Inline TypedPValue
Posted by ke...@apache.org.
This closes #2867: Inline TypedPValue
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff6bb353
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff6bb353
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff6bb353
Branch: refs/heads/master
Commit: ff6bb353002610169a8079d1164118606bebb21e
Parents: 34d25f4 ef27abd
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 20:58:13 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 20:58:13 2017 -0700
----------------------------------------------------------------------
.../core/construction/ForwardingPTransform.java | 6 +-
.../beam/runners/core/SplittableParDo.java | 3 +-
.../direct/ParDoMultiOverrideFactory.java | 3 +-
.../dataflow/DataflowPipelineTranslator.java | 5 +-
.../apache/beam/sdk/transforms/PTransform.java | 6 +-
.../org/apache/beam/sdk/transforms/ParDo.java | 3 +-
.../org/apache/beam/sdk/values/PCollection.java | 145 ++++++++++++-
.../org/apache/beam/sdk/values/PValueBase.java | 2 +-
.../org/apache/beam/sdk/values/TypedPValue.java | 208 -------------------
9 files changed, 150 insertions(+), 231 deletions(-)
----------------------------------------------------------------------