You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/14 01:57:25 UTC

[2/3] beam git commit: Introduces Contextful

Introduces Contextful


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b908c2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b908c2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b908c2e

Branch: refs/heads/master
Commit: 4b908c2e693fe9ed44fcb6c67a2d82c7da355259
Parents: 7f5753f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Sep 25 13:57:04 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 13 18:43:48 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/annotations/Experimental.java      |   8 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |  11 +-
 .../org/apache/beam/sdk/io/FileBasedSink.java   |   4 -
 .../java/org/apache/beam/sdk/io/FileIO.java     |   6 +-
 .../apache/beam/sdk/transforms/Contextful.java  | 127 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/ParDo.java   |   5 +-
 .../beam/sdk/transforms/Requirements.java       |  56 ++++++++
 .../org/apache/beam/sdk/transforms/Watch.java   |  36 ++++--
 .../apache/beam/sdk/values/TypeDescriptors.java |  36 ++++--
 .../apache/beam/sdk/transforms/WatchTest.java   |  46 ++++++-
 .../beam/sdk/values/TypeDescriptorsTest.java    |  17 ++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    |  12 +-
 .../io/gcp/bigquery/DynamicDestinations.java    |   4 -
 13 files changed, 305 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 80c4613..fecc407 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -94,6 +94,12 @@ public @interface Experimental {
     CORE_RUNNERS_ONLY,
 
     /** Experimental feature related to making the encoded element type available from a Coder. */
-    CODER_TYPE_ENCODING
+    CODER_TYPE_ENCODING,
+
+    /**
+     * Experimental APIs related to <a href="https://s.apache.org/context-fn">contextful
+     * closures</a>.
+     */
+    CONTEXTFUL,
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index e2ab980..1474759 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.joda.time.Duration;
 
@@ -724,14 +723,12 @@ public class AvroIO {
         return explicitCoder;
       }
       // If a coder was not specified explicitly, infer it from parse fn.
-      TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn);
-      String message =
-          "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
-      checkArgument(descriptor != null, message);
       try {
-        return coderRegistry.getCoder(descriptor);
+        return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn));
       } catch (CannotProvideCoderException e) {
-        throw new IllegalArgumentException(message, e);
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
+            e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index ea5129f..9834e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -319,10 +319,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
               DynamicDestinations.class,
               new TypeVariableExtractor<
                   DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
-      checkArgument(
-          descriptor != null,
-          "Unable to infer a coder for DestinationT, "
-              + "please specify it explicitly by overriding getDestinationCoder()");
       return registry.getCoder(descriptor);
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 7df4bde..a244c07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -346,12 +346,12 @@ public class FileIO {
       }
     }
 
-    private static class MatchPollFn implements Watch.Growth.PollFn<String, MatchResult.Metadata> {
+    private static class MatchPollFn extends Watch.Growth.PollFn<String, MatchResult.Metadata> {
       @Override
-      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String input, Instant timestamp)
+      public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
           throws Exception {
         return Watch.Growth.PollResult.incomplete(
-            Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata());
+            Instant.now(), FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata());
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
new file mode 100644
index 0000000..fb732cf
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** Pair of a bit of user code (a "closure") and the {@link Requirements} needed to run it. */
+@Experimental(Kind.CONTEXTFUL)
+public final class Contextful<ClosureT> implements Serializable {
+  private final ClosureT closure;
+  private final Requirements requirements;
+
+  private Contextful(ClosureT closure, Requirements requirements) {
+    this.closure = closure;
+    this.requirements = requirements;
+  }
+
+  /** Returns the closure. */
+  public ClosureT getClosure() {
+    return closure;
+  }
+
+  /** Returns the requirements needed to run the closure. */
+  public Requirements getRequirements() {
+    return requirements;
+  }
+
+  /** Constructs a pair of the given closure and its requirements. */
+  public static <ClosureT> Contextful<ClosureT> of(ClosureT closure, Requirements requirements) {
+    return new Contextful<>(closure, requirements);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("closure", closure)
+        .add("requirements", requirements)
+        .toString();
+  }
+
+  /**
+   * A function from an input to an output that may additionally access {@link Context} when
+   * computing the result.
+   */
+  public interface Fn<InputT, OutputT> extends Serializable {
+    /**
+     * Invokes the function on the given input with the given context. The function may use the
+     * context only for the capabilities declared in the {@link Contextful#getRequirements} of the
+     * enclosing {@link Contextful}.
+     */
+    OutputT apply(InputT element, Context c) throws Exception;
+
+    /** An accessor for additional capabilities available in {@link #apply}. */
+    abstract class Context {
+      /**
+       * Accesses the given side input. The window in which it is accessed is unspecified, depends
+       * on usage by the enclosing {@link PTransform}, and must be documented by that transform.
+       */
+      public <T> T sideInput(PCollectionView<T> view) {
+        throw new UnsupportedOperationException();
+      }
+
+      /**
+       * Convenience wrapper for creating a {@link Context} from a {@link DoFn.ProcessContext}, to
+       * support the common case when a {@link PTransform} is invoking the {@link
+       * Contextful#getClosure() closure} from inside a {@link DoFn}.
+       */
+      public static <InputT> Context wrapProcessContext(final DoFn<InputT, ?>.ProcessContext c) {
+        return new ContextFromProcessContext<>(c);
+      }
+
+      private static class ContextFromProcessContext<InputT> extends Context {
+        private final DoFn<InputT, ?>.ProcessContext c;
+
+        ContextFromProcessContext(DoFn<InputT, ?>.ProcessContext c) {
+          this.c = c;
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view) {
+          return c.sideInput(view);
+        }
+      }
+    }
+  }
+
+  /**
+   * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link Fn} with empty {@link
+   * Requirements}.
+   */
+  public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
+      final SerializableFunction<InputT, OutputT> fn) {
+    return new Contextful<Fn<InputT, OutputT>>(
+        new Fn<InputT, OutputT>() {
+          @Override
+          public OutputT apply(InputT element, Context c) throws Exception {
+            return fn.apply(element);
+          }
+        },
+        Requirements.empty());
+  }
+
+  /** Same with {@link #of} but with better type inference behavior for the case of {@link Fn}. */
+  public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
+      final Fn<InputT, OutputT> fn, Requirements requirements) {
+    return of(fn, requirements);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 49343c7..2ad84fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -589,7 +588,7 @@ public class ParDo {
         DoFn<InputT, OutputT> fn,
         List<PCollectionView<?>> sideInputs,
         DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
-      this.fn = SerializableUtils.clone(fn);
+      this.fn = fn;
       this.fnDisplayData = fnDisplayData;
       this.sideInputs = sideInputs;
     }
@@ -717,7 +716,7 @@ public class ParDo {
       this.sideInputs = sideInputs;
       this.mainOutputTag = mainOutputTag;
       this.additionalOutputTags = additionalOutputTags;
-      this.fn = SerializableUtils.clone(fn);
+      this.fn = fn;
       this.fnDisplayData = fnDisplayData;
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
new file mode 100644
index 0000000..acc409f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** Describes the run-time requirements of a {@link Contextful}, such as access to side inputs. */
+@Experimental(Kind.CONTEXTFUL)
+public final class Requirements implements Serializable {
+  private final Collection<PCollectionView<?>> sideInputs;
+
+  private Requirements(Collection<PCollectionView<?>> sideInputs) {
+    this.sideInputs = sideInputs;
+  }
+
+  /** The side inputs that this {@link Contextful} needs access to. */
+  public Collection<PCollectionView<?>> getSideInputs() {
+    return sideInputs;
+  }
+
+  /** Describes the need for access to the given side inputs. */
+  public static Requirements requiresSideInputs(Collection<PCollectionView<?>> sideInputs) {
+    return new Requirements(sideInputs);
+  }
+
+  /** Like {@link #requiresSideInputs(Collection)}. */
+  public static Requirements requiresSideInputs(PCollectionView<?>... sideInputs) {
+    return requiresSideInputs(Arrays.asList(sideInputs));
+  }
+
+  /** Describes an empty set of requirements. */
+  public static Requirements empty() {
+    return new Requirements(Collections.<PCollectionView<?>>emptyList());
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 21f0641..a3c906c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.Contextful.Fn.Context.wrapProcessContext;
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
 import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
 
@@ -117,13 +118,25 @@ public class Watch {
 
   /** Watches the growth of the given poll function. See class documentation for more details. */
   public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
-      Growth.PollFn<InputT, OutputT> pollFn) {
+      Contextful<Growth.PollFn<InputT, OutputT>> pollFn) {
     return new AutoValue_Watch_Growth.Builder<InputT, OutputT>()
         .setTerminationPerInput(Watch.Growth.<InputT>never())
         .setPollFn(pollFn)
         .build();
   }
 
+  /** Watches the growth of the given poll function. See class documentation for more details. */
+  public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
+      Growth.PollFn<InputT, OutputT> pollFn, Requirements requirements) {
+    return growthOf(Contextful.of(pollFn, requirements));
+  }
+
+  /** Watches the growth of the given poll function. See class documentation for more details. */
+  public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
+      Growth.PollFn<InputT, OutputT> pollFn) {
+    return growthOf(pollFn, Requirements.empty());
+  }
+
   /** Implementation of {@link #growthOf}. */
   @AutoValue
   public abstract static class Growth<InputT, OutputT>
@@ -202,12 +215,11 @@ public class Watch {
     }
 
     /**
-     * A function that computes the current set of outputs for the given input (given as a {@link
-     * TimestampedValue}), in the form of a {@link PollResult}.
+     * A function that computes the current set of outputs for the given input, in the form of a
+     * {@link PollResult}.
      */
-    public interface PollFn<InputT, OutputT> extends Serializable {
-      PollResult<OutputT> apply(InputT input, Instant timestamp) throws Exception;
-    }
+    public abstract static class PollFn<InputT, OutputT>
+        implements Contextful.Fn<InputT, PollResult<OutputT>> {}
 
     /**
      * A strategy for determining whether it is time to stop polling the current input regardless of
@@ -536,7 +548,7 @@ public class Watch {
       }
     }
 
-    abstract PollFn<InputT, OutputT> getPollFn();
+    abstract Contextful<PollFn<InputT, OutputT>> getPollFn();
 
     @Nullable
     abstract Duration getPollInterval();
@@ -551,7 +563,7 @@ public class Watch {
 
     @AutoValue.Builder
     abstract static class Builder<InputT, OutputT> {
-      abstract Builder<InputT, OutputT> setPollFn(PollFn<InputT, OutputT> pollFn);
+      abstract Builder<InputT, OutputT> setPollFn(Contextful<PollFn<InputT, OutputT>> pollFn);
 
       abstract Builder<InputT, OutputT> setTerminationPerInput(
           TerminationCondition<InputT, ?> terminationPerInput);
@@ -599,7 +611,7 @@ public class Watch {
         // of the PollFn.
         TypeDescriptor<OutputT> outputT =
             TypeDescriptors.extractFromTypeParameters(
-                getPollFn(),
+                getPollFn().getClosure(),
                 PollFn.class,
                 new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() {});
         try {
@@ -617,7 +629,8 @@ public class Watch {
       }
 
       return input
-          .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder)))
+          .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder))
+          .withSideInputs(getPollFn().getRequirements().getSideInputs()))
           .setCoder(KvCoder.of(input.getCoder(), outputCoder));
     }
   }
@@ -638,7 +651,8 @@ public class Watch {
         throws Exception {
       if (!tracker.hasPending() && !tracker.currentRestriction().isOutputComplete) {
         LOG.debug("{} - polling input", c.element());
-        Growth.PollResult<OutputT> res = spec.getPollFn().apply(c.element(), c.timestamp());
+        Growth.PollResult<OutputT> res =
+            spec.getPollFn().getClosure().apply(c.element(), wrapProcessContext(c));
         // TODO (https://issues.apache.org/jira/browse/BEAM-2680):
         // Consider truncating the pending outputs if there are too many, to avoid blowing
         // up the state. In that case, we'd rely on the next poll cycle to provide more outputs.

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index 8207f06..29a2496 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -24,6 +24,7 @@ import java.math.BigInteger;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.Contextful;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 
 /**
@@ -325,10 +326,9 @@ public class TypeDescriptors {
    * @param extractor A class for specifying the type to extract from the supertype
    *
    * @return A {@link TypeDescriptor} for the actual value of the result type of the extractor,
-   *   or {@code null} if the type was erased.
+   *   potentially containing unresolved type variables if the type was erased.
    */
   @SuppressWarnings("unchecked")
-  @Nullable
   public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
       T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
     return extractFromTypeParameters(
@@ -340,7 +340,6 @@ public class TypeDescriptors {
    * {@link TypeDescriptor} of the instance being analyzed rather than the instance itself.
    */
   @SuppressWarnings("unchecked")
-  @Nullable
   public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
       TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
     // Get the type signature of the extractor, e.g.
@@ -363,19 +362,13 @@ public class TypeDescriptors {
 
     // Get output of the extractor.
     Type outputT = ((ParameterizedType) extractorT.getType()).getActualTypeArguments()[1];
-    TypeDescriptor<?> res = TypeDescriptor.of(outputT);
-    if (res.hasUnresolvedParameters()) {
-      return null;
-    } else {
-      return (TypeDescriptor<V>) res;
-    }
+    return (TypeDescriptor<V>) TypeDescriptor.of(outputT);
   }
 
   /**
    * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to
-   * Java type erasure: returns {@code null} if the type was erased.
+   * Java type erasure: may contain unresolved type variables if the type was erased.
    */
-  @Nullable
   public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
       SerializableFunction<InputT, OutputT> fn) {
     return extractFromTypeParameters(
@@ -386,9 +379,8 @@ public class TypeDescriptors {
 
   /**
    * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to
-   * Java type erasure: returns {@code null} if the type was erased.
+   * Java type erasure: may contain unresolved type variables if the type was erased.
    */
-  @Nullable
   public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
       SerializableFunction<InputT, OutputT> fn) {
     return extractFromTypeParameters(
@@ -396,4 +388,22 @@ public class TypeDescriptors {
         SerializableFunction.class,
         new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {});
   }
+
+  /** Like {@link #inputOf(SerializableFunction)} but for {@link Contextful.Fn}. */
+  public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
+      Contextful.Fn<InputT, OutputT> fn) {
+    return TypeDescriptors.extractFromTypeParameters(
+        fn,
+        Contextful.Fn.class,
+        new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, InputT>() {});
+  }
+
+  /** Like {@link #outputOf(SerializableFunction)} but for {@link Contextful.Fn}. */
+  public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
+      Contextful.Fn<InputT, OutputT> fn) {
+    return TypeDescriptors.extractFromTypeParameters(
+        fn,
+        Contextful.Fn.class,
+        new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, OutputT>() {});
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index 132a1ff..113e8fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
 import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput;
 import static org.apache.beam.sdk.transforms.Watch.Growth.afterTotalOf;
 import static org.apache.beam.sdk.transforms.Watch.Growth.allOf;
@@ -57,6 +58,7 @@ import org.apache.beam.sdk.transforms.Watch.GrowthTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -81,9 +83,10 @@ public class WatchTest implements Serializable {
                 Watch.growthOf(
                         new PollFn<String, String>() {
                           @Override
-                          public PollResult<String> apply(String input, Instant time) {
+                          public PollResult<String> apply(String element, Context c)
+                              throws Exception {
                             return PollResult.complete(
-                                time, Arrays.asList(input + ".foo", input + ".bar"));
+                                Instant.now(), Arrays.asList(element + ".foo", element + ".bar"));
                           }
                         })
                     .withPollInterval(Duration.ZERO));
@@ -99,6 +102,36 @@ public class WatchTest implements Serializable {
 
   @Test
   @Category({NeedsRunner.class, UsesSplittableParDo.class})
+  public void testSinglePollMultipleInputsWithSideInput() {
+    final PCollectionView<String> moo =
+        p.apply("moo", Create.of("moo")).apply("moo singleton", View.<String>asSingleton());
+    final PCollectionView<String> zoo =
+        p.apply("zoo", Create.of("zoo")).apply("zoo singleton", View.<String>asSingleton());
+    PCollection<KV<String, String>> res =
+        p.apply("input", Create.of("a", "b"))
+            .apply(
+                Watch.growthOf(
+                        new PollFn<String, String>() {
+                          @Override
+                          public PollResult<String> apply(String element, Context c)
+                              throws Exception {
+                            return PollResult.complete(
+                                Instant.now(),
+                                Arrays.asList(
+                                    element + " " + c.sideInput(moo) + " " + c.sideInput(zoo)));
+                          }
+                        },
+                        requiresSideInputs(moo, zoo))
+                    .withPollInterval(Duration.ZERO));
+
+    PAssert.that(res)
+        .containsInAnyOrder(Arrays.asList(KV.of("a", "a moo zoo"), KV.of("b", "b moo zoo")));
+
+    p.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesSplittableParDo.class})
   public void testMultiplePollsWithTerminationBecauseOutputIsFinal() {
     testMultiplePolls(false);
   }
@@ -178,13 +211,14 @@ public class WatchTest implements Serializable {
                 Watch.growthOf(
                         new PollFn<String, KV<String, Integer>>() {
                           @Override
-                          public PollResult<KV<String, Integer>> apply(String input, Instant time) {
+                          public PollResult<KV<String, Integer>> apply(String element, Context c)
+                              throws Exception {
                             String pollId = UUID.randomUUID().toString();
                             List<KV<String, Integer>> output = Lists.newArrayList();
                             for (int i = 0; i < numResults; ++i) {
                               output.add(KV.of(pollId, i));
                             }
-                            return PollResult.complete(time, output);
+                            return PollResult.complete(Instant.now(), output);
                           }
                         })
                     .withTerminationPerInput(Watch.Growth.<String>afterTotalOf(standardSeconds(1)))
@@ -291,7 +325,7 @@ public class WatchTest implements Serializable {
    * Gradually emits all items from the given list, pairing each one with a UUID that identifies the
    * round of polling, so a client can check how many rounds of polling there were.
    */
-  private static class TimedPollFn<InputT, OutputT> implements PollFn<InputT, OutputT> {
+  private static class TimedPollFn<InputT, OutputT> extends PollFn<InputT, OutputT> {
     private final Instant baseTime;
     private final List<OutputT> outputs;
     private final Duration timeToOutputEverything;
@@ -311,7 +345,7 @@ public class WatchTest implements Serializable {
     }
 
     @Override
-    public PollResult<OutputT> apply(InputT input, Instant time) {
+    public PollResult<OutputT> apply(InputT element, Context c) throws Exception {
       Instant now = Instant.now();
       Duration elapsed = new Duration(baseTime, Instant.now());
       if (elapsed.isLongerThan(timeToFail)) {

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
index a4f58da..645da5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
@@ -25,10 +25,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets;
 import static org.apache.beam.sdk.values.TypeDescriptors.strings;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 
 import java.util.List;
 import java.util.Set;
+import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -115,8 +117,17 @@ public class TypeDescriptorsTest {
   @Test
   public void testTypeDescriptorsTypeParameterOfErased() throws Exception {
     Generic<Integer, String> instance = TypeDescriptorsTest.typeErasedGeneric();
-    assertNull(extractFooT(instance));
+
+    TypeDescriptor<Integer> fooT = extractFooT(instance);
+    assertNotNull(fooT);
+    // Using toString() assertions because verifying the contents of a Type is very cumbersome,
+    // and the expected types can not be easily constructed directly.
+    assertEquals("ActualFooT", fooT.toString());
+
     assertEquals(strings(), extractBarT(instance));
-    assertNull(extractKV(instance));
+
+    TypeDescriptor<KV<Integer, String>> kvT = extractKV(instance);
+    assertNotNull(kvT);
+    assertThat(kvT.toString(), CoreMatchers.containsString("KV<ActualFooT, java.lang.String>"));
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2771687..2f99643 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -95,7 +95,6 @@ import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import org.apache.beam.sdk.values.ValueInSingleWindow;
 import org.joda.time.Duration;
@@ -547,15 +546,12 @@ public class BigQueryIO {
         return getCoder();
       }
 
-      TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(getParseFn());
-
-      String message =
-          "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
-      checkArgument(descriptor != null, message);
       try {
-        return coderRegistry.getCoder(descriptor);
+        return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
       } catch (CannotProvideCoderException e) {
-        throw new IllegalArgumentException(message, e);
+        throw new IllegalArgumentException(
+            "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
+            e);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index ea4fc4e..ecfc990 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -164,10 +164,6 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
             DynamicDestinations.class,
             new TypeDescriptors.TypeVariableExtractor<
                 DynamicDestinations<T, DestinationT>, DestinationT>() {});
-    checkArgument(
-        descriptor != null,
-        "Unable to infer a coder for DestinationT, "
-            + "please specify it explicitly by overriding getDestinationCoder()");
     return registry.getCoder(descriptor);
   }
 }