You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 03:58:28 UTC

[1/2] beam git commit: Inline TypedPValue

Repository: beam
Updated Branches:
  refs/heads/master 34d25f406 -> ff6bb3530


Inline TypedPValue

This has exactly one implementation, and this is not expected to change.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ef27abdc
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ef27abdc
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ef27abdc

Branch: refs/heads/master
Commit: ef27abdca3010ba12fe0208925535762fde16d7c
Parents: 17f0843
Author: Thomas Groh <tg...@google.com>
Authored: Wed May 3 13:42:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed May 3 18:18:57 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ForwardingPTransform.java |   6 +-
 .../beam/runners/core/SplittableParDo.java      |   3 +-
 .../direct/ParDoMultiOverrideFactory.java       |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   5 +-
 .../apache/beam/sdk/transforms/PTransform.java  |   6 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   3 +-
 .../org/apache/beam/sdk/values/PCollection.java | 145 ++++++++++++-
 .../org/apache/beam/sdk/values/PValueBase.java  |   2 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 208 -------------------
 9 files changed, 150 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
index 2f427ad..ca25ba7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ForwardingPTransform.java
@@ -22,9 +22,9 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.TypedPValue;
 
 /**
  * A base class for implementing {@link PTransform} overrides, which behave identically to the
@@ -51,8 +51,8 @@ public abstract class ForwardingPTransform<InputT extends PInput, OutputT extend
   }
 
   @Override
-  public <T> Coder<T> getDefaultOutputCoder(InputT input, @SuppressWarnings("unused")
-      TypedPValue<T> output) throws CannotProvideCoderException {
+  public <T> Coder<T> getDefaultOutputCoder(InputT input, PCollection<T> output)
+      throws CannotProvideCoderException {
     return delegate().getDefaultOutputCoder(input, output);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 94f5f35..ed065a6 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypedPValue;
 import org.joda.time.Instant;
 
 /**
@@ -273,7 +272,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     public <T> Coder<T> getDefaultOutputCoder(
         PCollection<? extends KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>
             input,
-        TypedPValue<T> output)
+        PCollection<T> output)
         throws CannotProvideCoderException {
       // Similar logic to ParDo.MultiOutput.getDefaultOutputCoder.
       @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index 322c995..b10d669 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -51,7 +51,6 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypedPValue;
 
 /**
  * A {@link PTransformOverrideFactory} that provides overrides for applications of a {@link ParDo}
@@ -184,7 +183,7 @@ class ParDoMultiOverrideFactory<InputT, OutputT>
 
     @Override
     public <T> Coder<T> getDefaultOutputCoder(
-        PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, TypedPValue<T> output)
+        PCollection<? extends KeyedWorkItem<K, KV<K, InputT>>> input, PCollection<T> output)
         throws CannotProvideCoderException {
       return underlyingParDo.getDefaultOutputCoder(originalInput, output);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 05edd28..69b4ecd 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -97,7 +97,6 @@ import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypedPValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -599,8 +598,8 @@ public class DataflowPipelineTranslator {
     @Override
     public long addOutput(PValue value) {
       Coder<?> coder;
-      if (value instanceof TypedPValue) {
-        coder = ((TypedPValue<?>) value).getCoder();
+      if (value instanceof PCollection) {
+        coder = ((PCollection<?>) value).getCoder();
         if (value instanceof PCollection) {
           // Wrap the PCollection element Coder inside a WindowedValueCoder.
           coder = WindowedValue.getFullCoder(

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index 4f651f2..15abd98 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -29,11 +29,11 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.util.NameUtils;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TypedPValue;
 
 /**
  * A {@code PTransform<InputT, OutputT>} is an operation that takes an
@@ -122,7 +122,7 @@ import org.apache.beam.sdk.values.TypedPValue;
  * not known at run-time (e.g., due to Java's "erasure" of generic
  * types) or there was no default Coder registered, then the Coder
  * should be specified manually by calling
- * {@link org.apache.beam.sdk.values.TypedPValue#setCoder}
+ * {@link PCollection#setCoder}
  * on the output PCollection.  The Coder of every output
  * PCollection must be determined one way or another
  * before that output is used as an input to another PTransform, or
@@ -306,7 +306,7 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput>
    * @throws CannotProvideCoderException if none can be inferred.
    */
   public <T> Coder<T> getDefaultOutputCoder(
-      InputT input, @SuppressWarnings("unused") TypedPValue<T> output)
+      InputT input, @SuppressWarnings("unused") PCollection<T> output)
       throws CannotProvideCoderException {
     @SuppressWarnings("unchecked")
     Coder<T> defaultOutputCoder = (Coder<T>) getDefaultOutputCoder(input);

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 6137a7b..73d78c9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -53,7 +53,6 @@ import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypedPValue;
 
 /**
  * {@link ParDo} is the core element-wise transform in Apache Beam, invoking a user-specified
@@ -763,7 +762,7 @@ public class ParDo {
 
     @Override
     public <T> Coder<T> getDefaultOutputCoder(
-        PCollection<? extends InputT> input, TypedPValue<T> output)
+        PCollection<? extends InputT> input, PCollection<T> output)
         throws CannotProvideCoderException {
       @SuppressWarnings("unchecked")
       Coder<InputT> inputCoder = ((PCollection<InputT>) input).getCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 67520ce..034f0de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -17,14 +17,22 @@
  */
 package org.apache.beam.sdk.values;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -62,7 +70,119 @@ import org.apache.beam.sdk.util.WindowingStrategy;
  *
  * @param <T> the type of the elements of this {@link PCollection}
  */
-public class PCollection<T> extends TypedPValue<T> {
+public class PCollection<T> extends PValueBase implements PValue {
+
+  /**
+   * The {@link Coder} used by this {@link PCollection} to encode and decode the values stored in
+   * it, or null if not specified nor inferred yet.
+   */
+  private CoderOrFailure<T> coderOrFailure =
+      new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
+  private TypeDescriptor<T> typeDescriptor;
+
+  @Override
+  public void finishSpecifyingOutput(
+      PInput input, PTransform<?, ?> transform) {
+    this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
+    super.finishSpecifyingOutput(input, transform);
+  }
+
+  /**
+   * After building, finalizes this {@link PValue} to make it ready for
+   * running.  Automatically invoked whenever the {@link PValue} is "used"
+   * (e.g., when apply() is called on it) and when the Pipeline is
+   * run (useful if this is a {@link PValue} with no consumers).
+   */
+  @Override
+  public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
+    if (isFinishedSpecifying()) {
+      return;
+    }
+    this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
+    // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
+    // this will throw an exception.
+    getCoder();
+    super.finishSpecifying(input, transform);
+  }
+
+  /**
+   * Returns a {@link TypeDescriptor TypeDescriptor&lt;T&gt;} with some reflective information
+   * about {@code T}, if possible. May return {@code null} if no information
+   * is available. Subclasses may override this to enable better
+   * {@code Coder} inference.
+   */
+  public TypeDescriptor<T> getTypeDescriptor() {
+    return typeDescriptor;
+  }
+
+  /**
+   * If the coder is not explicitly set, this sets the coder for this {@link PCollection} to the
+   * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this
+   * is null, but can and should be improved by subclasses.
+   */
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private CoderOrFailure<T> inferCoderOrFail(
+      PInput input, PTransform<?, ?> transform, CoderRegistry registry) {
+    // First option for a coder: use the Coder set on this PValue.
+    if (coderOrFailure.coder != null) {
+      return coderOrFailure;
+    }
+
+    // Second option for a coder: use the default Coder from the producing PTransform.
+    CannotProvideCoderException inputCoderException;
+    try {
+      return new CoderOrFailure<>(
+          ((PTransform) transform).getDefaultOutputCoder(input, this), null);
+    } catch (CannotProvideCoderException exc) {
+      inputCoderException = exc;
+    }
+
+    // Third option for a coder: Look in the coder registry.
+    TypeDescriptor<T> token = getTypeDescriptor();
+    CannotProvideCoderException inferFromTokenException = null;
+    if (token != null) {
+      try {
+        return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
+      } catch (CannotProvideCoderException exc) {
+        inferFromTokenException = exc;
+        // Attempt to detect when the token came from a TupleTag used for a ParDo output,
+        // and provide a better error message if so. Unfortunately, this information is not
+        // directly available from the TypeDescriptor, so infer based on the type of the PTransform
+        // and the error message itself.
+        if (transform instanceof ParDo.MultiOutput
+            && exc.getReason() == ReasonCode.TYPE_ERASURE) {
+          inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
+              + " If this error occurs for an output of the producing ParDo, verify that the "
+              + "TupleTag for this output is constructed with proper type information (see "
+              + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
+        }
+      }
+    }
+
+    // Build up the error message and list of causes.
+    StringBuilder messageBuilder = new StringBuilder()
+        .append("Unable to return a default Coder for ").append(this)
+        .append(". Correct one of the following root causes:");
+
+    // No exception, but give the user a message about .setCoder() has not been called.
+    messageBuilder.append("\n  No Coder has been manually specified; ")
+        .append(" you may do so using .setCoder().");
+
+    if (inferFromTokenException != null) {
+      messageBuilder
+          .append("\n  Inferring a Coder from the CoderRegistry failed: ")
+          .append(inferFromTokenException.getMessage());
+    }
+
+    if (inputCoderException != null) {
+      messageBuilder
+          .append("\n  Using the default output Coder from the producing PTransform failed: ")
+          .append(inputCoderException.getMessage());
+    }
+
+    // Build and throw the exception.
+    return new CoderOrFailure<>(null, messageBuilder.toString());
+  }
 
   /**
    * The enumeration of cases for whether a {@link PCollection} is bounded.
@@ -126,9 +246,9 @@ public class PCollection<T> extends TypedPValue<T> {
    * @throws IllegalStateException if the {@link Coder} hasn't been set, and
    * couldn't be inferred.
    */
-  @Override
   public Coder<T> getCoder() {
-    return super.getCoder();
+    checkState(coderOrFailure.coder != null, coderOrFailure.failure);
+    return coderOrFailure.coder;
   }
 
   /**
@@ -139,9 +259,11 @@ public class PCollection<T> extends TypedPValue<T> {
    * been finalized and may no longer be set.
    * Once {@link #apply} has been called, this will be the case.
    */
-  @Override
   public PCollection<T> setCoder(Coder<T> coder) {
-    super.setCoder(coder);
+    checkState(
+        !isFinishedSpecifying(), "cannot change the Coder of %s once it's been used", this);
+    checkArgument(coder != null, "Cannot setCoder(null)");
+    this.coderOrFailure = new CoderOrFailure<>(coder, null);
     return this;
   }
 
@@ -202,9 +324,8 @@ public class PCollection<T> extends TypedPValue<T> {
    * {@link PCollectionTuple}, {@link PCollectionList}, or {@code PTransform<?, PCollection<T>>},
    * etc., to provide more detailed reflective information.
    */
-  @Override
   public PCollection<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor) {
-    super.setTypeDescriptor(typeDescriptor);
+    this.typeDescriptor = typeDescriptor;
     return this;
   }
 
@@ -241,4 +362,14 @@ public class PCollection<T> extends TypedPValue<T> {
         .setWindowingStrategyInternal(windowingStrategy)
         .setIsBoundedInternal(isBounded);
   }
+
+  private static class CoderOrFailure<T> {
+    @Nullable private final Coder<T> coder;
+    @Nullable private final String failure;
+
+    public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) {
+      this.coder = coder;
+      this.failure = failure;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index 9f151ec..7ab5808 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -130,7 +130,7 @@ public abstract class PValueBase implements PValue {
    *
    * <p>For internal use only.
    */
-  public boolean isFinishedSpecifyingInternal() {
+  boolean isFinishedSpecifying() {
     return finishedSpecifying;
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/ef27abdc/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
deleted file mode 100644
index f473776..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypedPValue.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.values;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-
-/**
- * A {@link TypedPValue TypedPValue&lt;T&gt;} is the abstract base class of things that
- * store some number of values of type {@code T}.
- *
- * <p>Because we know the type {@code T}, this is the layer of the inheritance hierarchy where
- * we store a coder for objects of type {@code T}.
- *
- * @param <T> the type of the values stored in this {@link TypedPValue}
- */
-public abstract class TypedPValue<T> extends PValueBase implements PValue {
-
-  /**
-   * Returns the {@link Coder} used by this {@link TypedPValue} to encode and decode
-   * the values stored in it.
-   *
-   * @throws IllegalStateException if the {@link Coder} hasn't been set, and
-   * couldn't be inferred.
-   */
-  public Coder<T> getCoder() {
-    checkState(coderOrFailure.coder != null, coderOrFailure.failure);
-    return coderOrFailure.coder;
-  }
-
-  /**
-   * Sets the {@link Coder} used by this {@link TypedPValue} to encode and decode the
-   * values stored in it. Returns {@code this}.
-   *
-   * @throws IllegalStateException if this {@link TypedPValue} has already
-   * been finalized and is no longer settable, e.g., by having
-   * {@code apply()} called on it
-   */
-  public TypedPValue<T> setCoder(Coder<T> coder) {
-    checkState(
-        !isFinishedSpecifyingInternal(), "cannot change the Coder of %s once it's been used", this);
-    checkArgument(coder != null, "Cannot setCoder(null)");
-    this.coderOrFailure = new CoderOrFailure<>(coder, null);
-    return this;
-  }
-
-  @Override
-  public void finishSpecifyingOutput(PInput input, PTransform<?, ?> transform) {
-    this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
-  }
-
-  /**
-   * After building, finalizes this {@link PValue} to make it ready for
-   * running.  Automatically invoked whenever the {@link PValue} is "used"
-   * (e.g., when apply() is called on it) and when the Pipeline is
-   * run (useful if this is a {@link PValue} with no consumers).
-   */
-  @Override
-  public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
-    if (isFinishedSpecifyingInternal()) {
-      return;
-    }
-    this.coderOrFailure = inferCoderOrFail(input, transform, getPipeline().getCoderRegistry());
-    // Ensure that this TypedPValue has a coder by inferring the coder if none exists; If not,
-    // this will throw an exception.
-    getCoder();
-    super.finishSpecifying(input, transform);
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // Internal details below here.
-
-  /**
-   * The {@link Coder} used by this {@link TypedPValue} to encode and decode the values stored in
-   * it, or null if not specified nor inferred yet.
-   */
-  private CoderOrFailure<T> coderOrFailure =
-      new CoderOrFailure<>(null, "No Coder was specified, and Coder Inference did not occur");
-
-  protected TypedPValue(Pipeline p) {
-    super(p);
-  }
-
-  private TypeDescriptor<T> typeDescriptor;
-
-  /**
-   * Returns a {@link TypeDescriptor TypeDescriptor&lt;T&gt;} with some reflective information
-   * about {@code T}, if possible. May return {@code null} if no information
-   * is available. Subclasses may override this to enable better
-   * {@code Coder} inference.
-   */
-  public TypeDescriptor<T> getTypeDescriptor() {
-    return typeDescriptor;
-  }
-
-  /**
-   * Sets the {@link TypeDescriptor TypeDescriptor&lt;T&gt;} associated with this class. Better
-   * reflective type information will lead to better {@link Coder}
-   * inference.
-   */
-  public TypedPValue<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor) {
-    this.typeDescriptor = typeDescriptor;
-    return this;
-  }
-
-  /**
-   * If the coder is not explicitly set, this sets the coder for this {@link TypedPValue} to the
-   * best coder that can be inferred based upon the known {@link TypeDescriptor}. By default, this
-   * is null, but can and should be improved by subclasses.
-   */
-  @SuppressWarnings({"unchecked", "rawtypes"})
-  private CoderOrFailure<T> inferCoderOrFail(
-      PInput input, PTransform<?, ?> transform, CoderRegistry registry) {
-    // First option for a coder: use the Coder set on this PValue.
-    if (coderOrFailure.coder != null) {
-      return coderOrFailure;
-    }
-
-    // Second option for a coder: use the default Coder from the producing PTransform.
-    CannotProvideCoderException inputCoderException;
-    try {
-      return new CoderOrFailure<>(
-          ((PTransform) transform).getDefaultOutputCoder(input, this), null);
-    } catch (CannotProvideCoderException exc) {
-      inputCoderException = exc;
-    }
-
-    // Third option for a coder: Look in the coder registry.
-    TypeDescriptor<T> token = getTypeDescriptor();
-    CannotProvideCoderException inferFromTokenException = null;
-    if (token != null) {
-      try {
-        return new CoderOrFailure<>(registry.getDefaultCoder(token), null);
-      } catch (CannotProvideCoderException exc) {
-        inferFromTokenException = exc;
-        // Attempt to detect when the token came from a TupleTag used for a ParDo output,
-        // and provide a better error message if so. Unfortunately, this information is not
-        // directly available from the TypeDescriptor, so infer based on the type of the PTransform
-        // and the error message itself.
-        if (transform instanceof ParDo.MultiOutput
-            && exc.getReason() == ReasonCode.TYPE_ERASURE) {
-          inferFromTokenException = new CannotProvideCoderException(exc.getMessage()
-              + " If this error occurs for an output of the producing ParDo, verify that the "
-              + "TupleTag for this output is constructed with proper type information (see "
-              + "TupleTag Javadoc) or explicitly set the Coder to use if this is not possible.");
-        }
-      }
-    }
-
-    // Build up the error message and list of causes.
-    StringBuilder messageBuilder = new StringBuilder()
-        .append("Unable to return a default Coder for ").append(this)
-        .append(". Correct one of the following root causes:");
-
-    // No exception, but give the user a message about .setCoder() has not been called.
-    messageBuilder.append("\n  No Coder has been manually specified; ")
-        .append(" you may do so using .setCoder().");
-
-    if (inferFromTokenException != null) {
-      messageBuilder
-          .append("\n  Inferring a Coder from the CoderRegistry failed: ")
-          .append(inferFromTokenException.getMessage());
-    }
-
-    if (inputCoderException != null) {
-      messageBuilder
-          .append("\n  Using the default output Coder from the producing PTransform failed: ")
-          .append(inputCoderException.getMessage());
-    }
-
-    // Build and throw the exception.
-    return new CoderOrFailure<>(null, messageBuilder.toString());
-  }
-
-  private static class CoderOrFailure<T> {
-    @Nullable private final Coder<T> coder;
-    @Nullable private final String failure;
-
-    public CoderOrFailure(@Nullable Coder<T> coder, @Nullable String failure) {
-      this.coder = coder;
-      this.failure = failure;
-    }
-  }
-}


[2/2] beam git commit: This closes #2867: Inline TypedPValue

Posted by ke...@apache.org.
This closes #2867: Inline TypedPValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ff6bb353
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ff6bb353
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ff6bb353

Branch: refs/heads/master
Commit: ff6bb353002610169a8079d1164118606bebb21e
Parents: 34d25f4 ef27abd
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed May 3 20:58:13 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed May 3 20:58:13 2017 -0700

----------------------------------------------------------------------
 .../core/construction/ForwardingPTransform.java |   6 +-
 .../beam/runners/core/SplittableParDo.java      |   3 +-
 .../direct/ParDoMultiOverrideFactory.java       |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   5 +-
 .../apache/beam/sdk/transforms/PTransform.java  |   6 +-
 .../org/apache/beam/sdk/transforms/ParDo.java   |   3 +-
 .../org/apache/beam/sdk/values/PCollection.java | 145 ++++++++++++-
 .../org/apache/beam/sdk/values/PValueBase.java  |   2 +-
 .../org/apache/beam/sdk/values/TypedPValue.java | 208 -------------------
 9 files changed, 150 insertions(+), 231 deletions(-)
----------------------------------------------------------------------