You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/10/14 01:57:24 UTC
[1/3] beam git commit: Supports side inputs in MapElements and
FlatMapElements
Repository: beam
Updated Branches:
refs/heads/master 7f5753f1f -> 014614b69
Supports side inputs in MapElements and FlatMapElements
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2ad925d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2ad925d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2ad925d
Branch: refs/heads/master
Commit: e2ad925dc4d8bb33a264a21c48b8ceef63ac6eb3
Parents: 4b908c2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Oct 2 17:36:48 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 13 18:43:48 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 11 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 9 +-
.../beam/sdk/transforms/FlatMapElements.java | 142 ++++++++-----------
.../apache/beam/sdk/transforms/MapElements.java | 71 +++++-----
.../beam/sdk/transforms/Requirements.java | 5 +
.../apache/beam/sdk/values/TypeDescriptors.java | 1 -
.../sdk/transforms/FlatMapElementsTest.java | 35 ++++-
.../beam/sdk/transforms/MapElementsTest.java | 42 +++++-
.../io/gcp/bigquery/DynamicDestinations.java | 9 +-
9 files changed, 196 insertions(+), 129 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 246eb81..49e36ca 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -88,15 +88,14 @@ public class SparkRunnerDebuggerTest {
"sparkContext.parallelize(Arrays.asList(...))\n"
+ "_.mapPartitions("
+ "new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"
- + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n"
+ "_.groupByKey()\n"
+ "_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n"
- + "_.mapPartitions(new org.apache.beam.runners.spark"
- + ".SparkRunnerDebuggerTest$PlusOne())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "sparkContext.union(...)\n"
+ "_.mapPartitions("
- + "new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
+ + "new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.<org.apache.beam.sdk.io.TextIO$Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
@@ -141,11 +140,11 @@ public class SparkRunnerDebuggerTest {
+ "_.map(new org.apache.beam.sdk.transforms.windowing.FixedWindows())\n"
+ "_.mapPartitions(new org.apache.beam.runners.spark."
+ "SparkRunnerDebuggerTest$FormatKVFn())\n"
- + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.groupByKey()\n"
+ "_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n"
+ "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n"
- + "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n"
+ + "_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n"
+ "_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>";
SparkRunnerDebugger.DebugSparkPipelineResult result =
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 9834e6e..d577fea 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -319,7 +319,14 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
DynamicDestinations.class,
new TypeVariableExtractor<
DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
- return registry.getCoder(descriptor);
+ try {
+ return registry.getCoder(descriptor);
+ } catch (CannotProvideCoderException e) {
+ throw new CannotProvideCoderException(
+ "Failed to infer coder for DestinationT from type "
+ + descriptor + ", please provide it explicitly by overriding getDestinationCoder()",
+ e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index a8a94f9..97e1dfb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -17,11 +17,13 @@
*/
package org.apache.beam.sdk.transforms;
-import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkArgument;
-import java.lang.reflect.ParameterizedType;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -32,30 +34,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
*/
public class FlatMapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- /**
- * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
- * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
- * if constructed using {@link #via(SimpleFunction)} directly.
- */
- @Nullable
- private final transient TypeDescriptor<Iterable<OutputT>> outputType;
-
- /**
- * Non-null on a fully specified transform - is null only when constructed using {@link
- * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
- */
- @Nullable
- private final SimpleFunction<InputT, Iterable<OutputT>> fn;
- private final DisplayData.ItemSpec<?> fnClassDisplayData;
+ private final transient TypeDescriptor<InputT> inputType;
+ private final transient TypeDescriptor<OutputT> outputType;
+ @Nullable private final transient Object originalFnForDisplayData;
+ @Nullable private final Contextful<Fn<InputT, Iterable<OutputT>>> fn;
private FlatMapElements(
- @Nullable SimpleFunction<InputT, Iterable<OutputT>> fn,
- @Nullable TypeDescriptor<Iterable<OutputT>> outputType,
- @Nullable Class<?> fnClass) {
+ @Nullable Contextful<Fn<InputT, Iterable<OutputT>>> fn,
+ @Nullable Object originalFnForDisplayData,
+ TypeDescriptor<InputT> inputType,
+ TypeDescriptor<OutputT> outputType) {
this.fn = fn;
+ this.originalFnForDisplayData = originalFnForDisplayData;
+ this.inputType = inputType;
this.outputType = outputType;
- this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
-
}
/**
@@ -82,7 +74,14 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public static <InputT, OutputT> FlatMapElements<InputT, OutputT>
via(SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
- return new FlatMapElements(fn, null, fn.getClass());
+ Contextful<Fn<InputT, Iterable<OutputT>>> wrapped = (Contextful) Contextful.fn(fn);
+ TypeDescriptor<OutputT> outputType =
+ TypeDescriptors.extractFromTypeParameters(
+ (TypeDescriptor<Iterable<OutputT>>) fn.getOutputTypeDescriptor(),
+ Iterable.class,
+ new TypeDescriptors.TypeVariableExtractor<Iterable<OutputT>, OutputT>() {});
+ TypeDescriptor<InputT> inputType = (TypeDescriptor<InputT>) fn.getInputTypeDescriptor();
+ return new FlatMapElements<>(wrapped, fn, inputType, outputType);
}
/**
@@ -91,7 +90,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public static <OutputT> FlatMapElements<?, OutputT>
into(final TypeDescriptor<OutputT> outputType) {
- return new FlatMapElements<>(null, TypeDescriptors.iterables(outputType), null);
+ return new FlatMapElements<>(null, null, null, outputType);
}
/**
@@ -112,73 +111,58 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public <NewInputT> FlatMapElements<NewInputT, OutputT>
via(SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
- return new FlatMapElements(
- SimpleFunction.fromSerializableFunctionWithOutputType(fn, (TypeDescriptor) outputType),
- null,
- fn.getClass());
+ return new FlatMapElements<>(
+ (Contextful) Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
+ }
+
+ /** Like {@link #via(SerializableFunction)}, but allows access to additional context. */
+ @Experimental(Experimental.Kind.CONTEXTFUL)
+ public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
+ Contextful<Fn<NewInputT, Iterable<OutputT>>> fn) {
+ return new FlatMapElements<>(
+ fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()), outputType);
}
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
- checkNotNull(fn, "Must specify a function on FlatMapElements using .via()");
+ checkArgument(fn != null, ".via() is required");
return input.apply(
"FlatMap",
ParDo.of(
- new DoFn<InputT, OutputT>() {
- private static final long serialVersionUID = 0L;
-
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (OutputT element : fn.apply(c.element())) {
- c.output(element);
- }
- }
-
- @Override
- public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
- }
-
- @Override
- public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- @SuppressWarnings({"rawtypes", "unchecked"}) // safe by static typing
- TypeDescriptor<Iterable<?>> iterableType =
- (TypeDescriptor) fn.getOutputTypeDescriptor();
-
- @SuppressWarnings("unchecked") // safe by correctness of getIterableElementType
- TypeDescriptor<OutputT> outputType =
- (TypeDescriptor<OutputT>) getIterableElementType(iterableType);
-
- return outputType;
- }
- }));
+ new DoFn<InputT, OutputT>() {
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ Iterable<OutputT> res =
+ fn.getClosure().apply(c.element(), Fn.Context.wrapProcessContext(c));
+ for (OutputT output : res) {
+ c.output(output);
+ }
+ }
+
+ @Override
+ public TypeDescriptor<InputT> getInputTypeDescriptor() {
+ return inputType;
+ }
+
+ @Override
+ public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+ return outputType;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.delegate(FlatMapElements.this);
+ }
+ })
+ .withSideInputs(fn.getRequirements().getSideInputs()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder
- .include("flatMapFn", fn)
- .add(fnClassDisplayData);
- }
-
- /**
- * Does a best-effort job of getting the best {@link TypeDescriptor} for the type of the
- * elements contained in the iterable described by the given {@link TypeDescriptor}.
- */
- private static TypeDescriptor<?> getIterableElementType(
- TypeDescriptor<Iterable<?>> iterableTypeDescriptor) {
-
- // If a rawtype was used, the type token may be for Object, not a subtype of Iterable.
- // In this case, we rely on static typing of the function elsewhere to ensure it is
- // at least some kind of iterable, and grossly overapproximate the element type to be Object.
- if (!iterableTypeDescriptor.isSubtypeOf(new TypeDescriptor<Iterable<?>>() {})) {
- return new TypeDescriptor<Object>() {};
+ builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
+ if (originalFnForDisplayData instanceof HasDisplayData) {
+ builder.include("fn", (HasDisplayData) originalFnForDisplayData);
}
-
- // Otherwise we can do the proper thing and get the actual type parameter.
- ParameterizedType iterableType =
- (ParameterizedType) iterableTypeDescriptor.getSupertype(Iterable.class).getType();
- return TypeDescriptor.of(iterableType.getActualTypeArguments()[0]);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 792a6d5..1d259ac 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -20,36 +20,34 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkNotNull;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
/**
* {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}.
*/
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
- /**
- * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
- * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
- * if constructed using {@link #via(SimpleFunction)} directly.
- */
- @Nullable private final transient TypeDescriptor<OutputT> outputType;
-
- /**
- * Non-null on a fully specified transform - is null only when constructed using {@link
- * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
- */
- @Nullable private final SimpleFunction<InputT, OutputT> fn;
- private final DisplayData.ItemSpec<?> fnClassDisplayData;
+ private final transient TypeDescriptor<InputT> inputType;
+ private final transient TypeDescriptor<OutputT> outputType;
+ @Nullable private final transient Object originalFnForDisplayData;
+ @Nullable private final Contextful<Fn<InputT, OutputT>> fn;
private MapElements(
- @Nullable SimpleFunction<InputT, OutputT> fn,
- @Nullable TypeDescriptor<OutputT> outputType,
- @Nullable Class<?> fnClass) {
+ @Nullable Contextful<Fn<InputT, OutputT>> fn,
+ @Nullable Object originalFnForDisplayData,
+ TypeDescriptor<InputT> inputType,
+ TypeDescriptor<OutputT> outputType) {
this.fn = fn;
+ this.originalFnForDisplayData = originalFnForDisplayData;
+ this.inputType = inputType;
this.outputType = outputType;
- this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
}
/**
@@ -57,10 +55,11 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
* takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>}
* containing {@code fn.apply(v)} for every element {@code v} in the input.
*
- * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload
- * {@link #via(SerializableFunction)} supports use of lambda for greater concision.
+ * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link
+ * #via(SerializableFunction)} supports use of lambda for greater concision.
*
* <p>Example of use in Java 7:
+ *
* <pre>{@code
* PCollection<String> words = ...;
* PCollection<Integer> wordsPerLine = words.apply(MapElements.via(
@@ -73,7 +72,8 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public static <InputT, OutputT> MapElements<InputT, OutputT> via(
final SimpleFunction<InputT, OutputT> fn) {
- return new MapElements<>(fn, null, fn.getClass());
+ return new MapElements<>(
+ Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), fn.getOutputTypeDescriptor());
}
/**
@@ -82,7 +82,7 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public static <OutputT> MapElements<?, OutputT>
into(final TypeDescriptor<OutputT> outputType) {
- return new MapElements<>(null, outputType, null);
+ return new MapElements<>(null, null, null, outputType);
}
/**
@@ -104,10 +104,16 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public <NewInputT> MapElements<NewInputT, OutputT> via(
SerializableFunction<NewInputT, OutputT> fn) {
+ return new MapElements<>(Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
+ }
+
+ /**
+ * Like {@link #via(SerializableFunction)}, but supports access to context, such as side inputs.
+ */
+ @Experimental(Kind.CONTEXTFUL)
+ public <NewInputT> MapElements<NewInputT, OutputT> via(Contextful<Fn<NewInputT, OutputT>> fn) {
return new MapElements<>(
- SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType),
- null,
- fn.getClass());
+ fn, fn.getClosure(), TypeDescriptors.inputOf(fn.getClosure()), outputType);
}
@Override
@@ -118,8 +124,8 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
ParDo.of(
new DoFn<InputT, OutputT>() {
@ProcessElement
- public void processElement(ProcessContext c) {
- c.output(fn.apply(c.element()));
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(fn.getClosure().apply(c.element(), Fn.Context.wrapProcessContext(c)));
}
@Override
@@ -129,21 +135,22 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
@Override
public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return fn.getInputTypeDescriptor();
+ return inputType;
}
@Override
public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return fn.getOutputTypeDescriptor();
+ return outputType;
}
- }));
+ }).withSideInputs(fn.getRequirements().getSideInputs()));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder
- .include("mapFn", fn)
- .add(fnClassDisplayData);
+ builder.add(DisplayData.item("class", originalFnForDisplayData.getClass()));
+ if (originalFnForDisplayData instanceof HasDisplayData) {
+ builder.include("fn", (HasDisplayData) originalFnForDisplayData);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
index acc409f..f90e8f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
@@ -53,4 +53,9 @@ public final class Requirements implements Serializable {
public static Requirements empty() {
return new Requirements(Collections.<PCollectionView<?>>emptyList());
}
+
+ /** Whether this is an empty set of requirements. */
+ public boolean isEmpty() {
+ return sideInputs.isEmpty();
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index 29a2496..e59f84b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -23,7 +23,6 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.SerializableFunction;
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 11f284f..68ceafb 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.transforms.Contextful.fn;
+import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
@@ -30,9 +33,11 @@ import java.util.Set;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Rule;
import org.junit.Test;
@@ -77,6 +82,32 @@ public class FlatMapElementsTest implements Serializable {
}
/**
+ * Basic test of {@link FlatMapElements} with a {@link Fn} and a side input.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFlatMapBasicWithSideInput() throws Exception {
+ final PCollectionView<Integer> view =
+ pipeline.apply("Create base", Create.of(40)).apply(View.<Integer>asSingleton());
+ PCollection<Integer> output =
+ pipeline
+ .apply(Create.of(0, 1, 2))
+ .apply(
+ FlatMapElements.into(integers()).via(fn(
+ new Fn<Integer, Iterable<Integer>>() {
+ @Override
+ public List<Integer> apply(Integer input, Context c) {
+ return ImmutableList.of(
+ c.sideInput(view) - input, c.sideInput(view) + input);
+ }
+ },
+ requiresSideInputs(view))));
+
+ PAssert.that(output).containsInAnyOrder(38, 39, 40, 40, 41, 42);
+ pipeline.run();
+ }
+
+ /**
* Tests that when built with a concrete subclass of {@link SimpleFunction}, the type descriptor
* of the output reflects its static type.
*/
@@ -144,7 +175,7 @@ public class FlatMapElementsTest implements Serializable {
};
FlatMapElements<?, ?> simpleMap = FlatMapElements.via(simpleFn);
- assertThat(DisplayData.from(simpleMap), hasDisplayItem("flatMapFn", simpleFn.getClass()));
+ assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", simpleFn.getClass()));
}
@Test
@@ -162,7 +193,7 @@ public class FlatMapElementsTest implements Serializable {
};
FlatMapElements<?, ?> simpleFlatMap = FlatMapElements.via(simpleFn);
- assertThat(DisplayData.from(simpleFlatMap), hasDisplayItem("flatMapFn", simpleFn.getClass()));
+ assertThat(DisplayData.from(simpleFlatMap), hasDisplayItem("class", simpleFn.getClass()));
assertThat(DisplayData.from(simpleFlatMap), hasDisplayItem("foo", "baz"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 241b60e..2c24f10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.transforms.Contextful.fn;
+import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static org.apache.beam.sdk.values.TypeDescriptors.integers;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertThat;
@@ -28,12 +31,13 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.ValidatesRunner;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -96,6 +100,30 @@ public class MapElementsTest implements Serializable {
}
/**
+ * Basic test of {@link MapElements} with a {@link Fn} and a side input.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testMapBasicWithSideInput() throws Exception {
+ final PCollectionView<Integer> view =
+ pipeline.apply("Create base", Create.of(40)).apply(View.<Integer>asSingleton());
+ PCollection<Integer> output =
+ pipeline
+ .apply(Create.of(0, 1, 2))
+ .apply(MapElements.into(integers())
+ .via(fn(new Fn<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer element, Context c) {
+ return element + c.sideInput(view);
+ }
+ },
+ requiresSideInputs(view))));
+
+ PAssert.that(output).containsInAnyOrder(40, 41, 42);
+ pipeline.run();
+ }
+
+ /**
* Basic test of {@link MapElements} coder propagation with a parametric {@link SimpleFunction}.
*/
@Test
@@ -157,7 +185,7 @@ public class MapElementsTest implements Serializable {
pipeline
.apply(Create.of(1, 2, 3))
.apply(
- MapElements.into(TypeDescriptors.integers())
+ MapElements.into(integers())
.via(
new SerializableFunction<Integer, Integer>() {
@Override
@@ -216,9 +244,9 @@ public class MapElementsTest implements Serializable {
};
MapElements<?, ?> serializableMap =
- MapElements.into(TypeDescriptors.integers()).via(serializableFn);
+ MapElements.into(integers()).via(serializableFn);
assertThat(DisplayData.from(serializableMap),
- hasDisplayItem("mapFn", serializableFn.getClass()));
+ hasDisplayItem("class", serializableFn.getClass()));
}
@Test
@@ -231,7 +259,7 @@ public class MapElementsTest implements Serializable {
};
MapElements<?, ?> simpleMap = MapElements.via(simpleFn);
- assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", simpleFn.getClass()));
+ assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", simpleFn.getClass()));
}
@Test
public void testSimpleFunctionDisplayData() {
@@ -250,7 +278,7 @@ public class MapElementsTest implements Serializable {
MapElements<?, ?> simpleMap = MapElements.via(simpleFn);
- assertThat(DisplayData.from(simpleMap), hasDisplayItem("mapFn", simpleFn.getClass()));
+ assertThat(DisplayData.from(simpleMap), hasDisplayItem("class", simpleFn.getClass()));
assertThat(DisplayData.from(simpleMap), hasDisplayItem("foo", "baz"));
}
@@ -269,7 +297,7 @@ public class MapElementsTest implements Serializable {
Set<DisplayData> displayData = evaluator.<Integer>displayDataForPrimitiveTransforms(map);
assertThat("MapElements should include the mapFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
+ displayData, hasItem(hasDisplayItem("class", mapFn.getClass())));
}
static class VoidValues<K, V>
http://git-wip-us.apache.org/repos/asf/beam/blob/e2ad925d/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index ecfc990..e351138 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -164,6 +164,13 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
DynamicDestinations.class,
new TypeDescriptors.TypeVariableExtractor<
DynamicDestinations<T, DestinationT>, DestinationT>() {});
- return registry.getCoder(descriptor);
+ try {
+ return registry.getCoder(descriptor);
+ } catch (CannotProvideCoderException e) {
+ throw new CannotProvideCoderException(
+ "Failed to infer coder for DestinationT from type "
+ + descriptor + ", please provide it explicitly by overriding getDestinationCoder()",
+ e);
+ }
}
}
[3/3] beam git commit: This closes #3921: [BEAM-3009] Introduces
Contextful machinery and uses it to add side input support to Watch
Posted by jk...@apache.org.
This closes #3921: [BEAM-3009] Introduces Contextful machinery and uses it to add side input support to Watch
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/014614b6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/014614b6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/014614b6
Branch: refs/heads/master
Commit: 014614b695bac0b636aae662977dd3a3fa3b8a1e
Parents: 7f5753f e2ad925
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Fri Oct 13 18:44:28 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 13 18:44:28 2017 -0700
----------------------------------------------------------------------
.../runners/spark/SparkRunnerDebuggerTest.java | 11 +-
.../beam/sdk/annotations/Experimental.java | 8 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 11 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 13 +-
.../java/org/apache/beam/sdk/io/FileIO.java | 6 +-
.../apache/beam/sdk/transforms/Contextful.java | 127 +++++++++++++++++
.../beam/sdk/transforms/FlatMapElements.java | 142 ++++++++-----------
.../apache/beam/sdk/transforms/MapElements.java | 71 +++++-----
.../org/apache/beam/sdk/transforms/ParDo.java | 5 +-
.../beam/sdk/transforms/Requirements.java | 61 ++++++++
.../org/apache/beam/sdk/transforms/Watch.java | 36 +++--
.../apache/beam/sdk/values/TypeDescriptors.java | 37 +++--
.../sdk/transforms/FlatMapElementsTest.java | 35 ++++-
.../beam/sdk/transforms/MapElementsTest.java | 42 +++++-
.../apache/beam/sdk/transforms/WatchTest.java | 46 +++++-
.../beam/sdk/values/TypeDescriptorsTest.java | 17 ++-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +-
.../io/gcp/bigquery/DynamicDestinations.java | 13 +-
18 files changed, 501 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Introduces Contextful
Posted by jk...@apache.org.
Introduces Contextful
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4b908c2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4b908c2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4b908c2e
Branch: refs/heads/master
Commit: 4b908c2e693fe9ed44fcb6c67a2d82c7da355259
Parents: 7f5753f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Mon Sep 25 13:57:04 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Fri Oct 13 18:43:48 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/annotations/Experimental.java | 8 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 11 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 4 -
.../java/org/apache/beam/sdk/io/FileIO.java | 6 +-
.../apache/beam/sdk/transforms/Contextful.java | 127 +++++++++++++++++++
.../org/apache/beam/sdk/transforms/ParDo.java | 5 +-
.../beam/sdk/transforms/Requirements.java | 56 ++++++++
.../org/apache/beam/sdk/transforms/Watch.java | 36 ++++--
.../apache/beam/sdk/values/TypeDescriptors.java | 36 ++++--
.../apache/beam/sdk/transforms/WatchTest.java | 46 ++++++-
.../beam/sdk/values/TypeDescriptorsTest.java | 17 ++-
.../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 12 +-
.../io/gcp/bigquery/DynamicDestinations.java | 4 -
13 files changed, 305 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 80c4613..fecc407 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -94,6 +94,12 @@ public @interface Experimental {
CORE_RUNNERS_ONLY,
/** Experimental feature related to making the encoded element type available from a Coder. */
- CODER_TYPE_ENCODING
+ CODER_TYPE_ENCODING,
+
+ /**
+ * Experimental APIs related to <a href="https://s.apache.org/context-fn">contextful
+ * closures</a>.
+ */
+ CONTEXTFUL,
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index e2ab980..1474759 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -55,7 +55,6 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
@@ -724,14 +723,12 @@ public class AvroIO {
return explicitCoder;
}
// If a coder was not specified explicitly, infer it from parse fn.
- TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(parseFn);
- String message =
- "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
- checkArgument(descriptor != null, message);
try {
- return coderRegistry.getCoder(descriptor);
+ return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn));
} catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(message, e);
+ throw new IllegalArgumentException(
+ "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
+ e);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index ea5129f..9834e6e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -319,10 +319,6 @@ public abstract class FileBasedSink<UserT, DestinationT, OutputT>
DynamicDestinations.class,
new TypeVariableExtractor<
DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
- checkArgument(
- descriptor != null,
- "Unable to infer a coder for DestinationT, "
- + "please specify it explicitly by overriding getDestinationCoder()");
return registry.getCoder(descriptor);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index 7df4bde..a244c07 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -346,12 +346,12 @@ public class FileIO {
}
}
- private static class MatchPollFn implements Watch.Growth.PollFn<String, MatchResult.Metadata> {
+ private static class MatchPollFn extends Watch.Growth.PollFn<String, MatchResult.Metadata> {
@Override
- public Watch.Growth.PollResult<MatchResult.Metadata> apply(String input, Instant timestamp)
+ public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
throws Exception {
return Watch.Growth.PollResult.incomplete(
- Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata());
+ Instant.now(), FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata());
}
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
new file mode 100644
index 0000000..fb732cf
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** Pair of a bit of user code (a "closure") and the {@link Requirements} needed to run it. */
+@Experimental(Kind.CONTEXTFUL)
+public final class Contextful<ClosureT> implements Serializable {
+ private final ClosureT closure;
+ private final Requirements requirements;
+
+ private Contextful(ClosureT closure, Requirements requirements) {
+ this.closure = closure;
+ this.requirements = requirements;
+ }
+
+ /** Returns the closure. */
+ public ClosureT getClosure() {
+ return closure;
+ }
+
+ /** Returns the requirements needed to run the closure. */
+ public Requirements getRequirements() {
+ return requirements;
+ }
+
+ /** Constructs a pair of the given closure and its requirements. */
+ public static <ClosureT> Contextful<ClosureT> of(ClosureT closure, Requirements requirements) {
+ return new Contextful<>(closure, requirements);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("closure", closure)
+ .add("requirements", requirements)
+ .toString();
+ }
+
+ /**
+ * A function from an input to an output that may additionally access {@link Context} when
+ * computing the result.
+ */
+ public interface Fn<InputT, OutputT> extends Serializable {
+ /**
+ * Invokes the function on the given input with the given context. The function may use the
+ * context only for the capabilities declared in the {@link Contextful#getRequirements} of the
+ * enclosing {@link Contextful}.
+ */
+ OutputT apply(InputT element, Context c) throws Exception;
+
+ /** An accessor for additional capabilities available in {@link #apply}. */
+ abstract class Context {
+ /**
+ * Accesses the given side input. The window in which it is accessed is unspecified, depends
+ * on usage by the enclosing {@link PTransform}, and must be documented by that transform.
+ */
+ public <T> T sideInput(PCollectionView<T> view) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Convenience wrapper for creating a {@link Context} from a {@link DoFn.ProcessContext}, to
+ * support the common case when a {@link PTransform} is invoking the {@link
+ * Contextful#getClosure() closure} from inside a {@link DoFn}.
+ */
+ public static <InputT> Context wrapProcessContext(final DoFn<InputT, ?>.ProcessContext c) {
+ return new ContextFromProcessContext<>(c);
+ }
+
+ private static class ContextFromProcessContext<InputT> extends Context {
+ private final DoFn<InputT, ?>.ProcessContext c;
+
+ ContextFromProcessContext(DoFn<InputT, ?>.ProcessContext c) {
+ this.c = c;
+ }
+
+ @Override
+ public <T> T sideInput(PCollectionView<T> view) {
+ return c.sideInput(view);
+ }
+ }
+ }
+ }
+
+ /**
+ * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link Fn} with empty {@link
+ * Requirements}.
+ */
+ public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
+ final SerializableFunction<InputT, OutputT> fn) {
+ return new Contextful<Fn<InputT, OutputT>>(
+ new Fn<InputT, OutputT>() {
+ @Override
+ public OutputT apply(InputT element, Context c) throws Exception {
+ return fn.apply(element);
+ }
+ },
+ Requirements.empty());
+ }
+
+ /** Same with {@link #of} but with better type inference behavior for the case of {@link Fn}. */
+ public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
+ final Fn<InputT, OutputT> fn, Requirements requirements) {
+ return of(fn, requirements);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 49343c7..2ad84fb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -46,7 +46,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.NameUtils;
-import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
@@ -589,7 +588,7 @@ public class ParDo {
DoFn<InputT, OutputT> fn,
List<PCollectionView<?>> sideInputs,
DisplayData.ItemSpec<? extends Class<?>> fnDisplayData) {
- this.fn = SerializableUtils.clone(fn);
+ this.fn = fn;
this.fnDisplayData = fnDisplayData;
this.sideInputs = sideInputs;
}
@@ -717,7 +716,7 @@ public class ParDo {
this.sideInputs = sideInputs;
this.mainOutputTag = mainOutputTag;
this.additionalOutputTags = additionalOutputTags;
- this.fn = SerializableUtils.clone(fn);
+ this.fn = fn;
this.fnDisplayData = fnDisplayData;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
new file mode 100644
index 0000000..acc409f
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Requirements.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/** Describes the run-time requirements of a {@link Contextful}, such as access to side inputs. */
+@Experimental(Kind.CONTEXTFUL)
+public final class Requirements implements Serializable {
+ private final Collection<PCollectionView<?>> sideInputs;
+
+ private Requirements(Collection<PCollectionView<?>> sideInputs) {
+ this.sideInputs = sideInputs;
+ }
+
+ /** The side inputs that this {@link Contextful} needs access to. */
+ public Collection<PCollectionView<?>> getSideInputs() {
+ return sideInputs;
+ }
+
+ /** Describes the need for access to the given side inputs. */
+ public static Requirements requiresSideInputs(Collection<PCollectionView<?>> sideInputs) {
+ return new Requirements(sideInputs);
+ }
+
+ /** Like {@link #requiresSideInputs(Collection)}. */
+ public static Requirements requiresSideInputs(PCollectionView<?>... sideInputs) {
+ return requiresSideInputs(Arrays.asList(sideInputs));
+ }
+
+ /** Describes an empty set of requirements. */
+ public static Requirements empty() {
+ return new Requirements(Collections.<PCollectionView<?>>emptyList());
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 21f0641..a3c906c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.Contextful.Fn.Context.wrapProcessContext;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume;
import static org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop;
@@ -117,13 +118,25 @@ public class Watch {
/** Watches the growth of the given poll function. See class documentation for more details. */
public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
- Growth.PollFn<InputT, OutputT> pollFn) {
+ Contextful<Growth.PollFn<InputT, OutputT>> pollFn) {
return new AutoValue_Watch_Growth.Builder<InputT, OutputT>()
.setTerminationPerInput(Watch.Growth.<InputT>never())
.setPollFn(pollFn)
.build();
}
+ /** Watches the growth of the given poll function. See class documentation for more details. */
+ public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
+ Growth.PollFn<InputT, OutputT> pollFn, Requirements requirements) {
+ return growthOf(Contextful.of(pollFn, requirements));
+ }
+
+ /** Watches the growth of the given poll function. See class documentation for more details. */
+ public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
+ Growth.PollFn<InputT, OutputT> pollFn) {
+ return growthOf(pollFn, Requirements.empty());
+ }
+
/** Implementation of {@link #growthOf}. */
@AutoValue
public abstract static class Growth<InputT, OutputT>
@@ -202,12 +215,11 @@ public class Watch {
}
/**
- * A function that computes the current set of outputs for the given input (given as a {@link
- * TimestampedValue}), in the form of a {@link PollResult}.
+ * A function that computes the current set of outputs for the given input, in the form of a
+ * {@link PollResult}.
*/
- public interface PollFn<InputT, OutputT> extends Serializable {
- PollResult<OutputT> apply(InputT input, Instant timestamp) throws Exception;
- }
+ public abstract static class PollFn<InputT, OutputT>
+ implements Contextful.Fn<InputT, PollResult<OutputT>> {}
/**
* A strategy for determining whether it is time to stop polling the current input regardless of
@@ -536,7 +548,7 @@ public class Watch {
}
}
- abstract PollFn<InputT, OutputT> getPollFn();
+ abstract Contextful<PollFn<InputT, OutputT>> getPollFn();
@Nullable
abstract Duration getPollInterval();
@@ -551,7 +563,7 @@ public class Watch {
@AutoValue.Builder
abstract static class Builder<InputT, OutputT> {
- abstract Builder<InputT, OutputT> setPollFn(PollFn<InputT, OutputT> pollFn);
+ abstract Builder<InputT, OutputT> setPollFn(Contextful<PollFn<InputT, OutputT>> pollFn);
abstract Builder<InputT, OutputT> setTerminationPerInput(
TerminationCondition<InputT, ?> terminationPerInput);
@@ -599,7 +611,7 @@ public class Watch {
// of the PollFn.
TypeDescriptor<OutputT> outputT =
TypeDescriptors.extractFromTypeParameters(
- getPollFn(),
+ getPollFn().getClosure(),
PollFn.class,
new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() {});
try {
@@ -617,7 +629,8 @@ public class Watch {
}
return input
- .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder)))
+ .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder))
+ .withSideInputs(getPollFn().getRequirements().getSideInputs()))
.setCoder(KvCoder.of(input.getCoder(), outputCoder));
}
}
@@ -638,7 +651,8 @@ public class Watch {
throws Exception {
if (!tracker.hasPending() && !tracker.currentRestriction().isOutputComplete) {
LOG.debug("{} - polling input", c.element());
- Growth.PollResult<OutputT> res = spec.getPollFn().apply(c.element(), c.timestamp());
+ Growth.PollResult<OutputT> res =
+ spec.getPollFn().getClosure().apply(c.element(), wrapProcessContext(c));
// TODO (https://issues.apache.org/jira/browse/BEAM-2680):
// Consider truncating the pending outputs if there are too many, to avoid blowing
// up the state. In that case, we'd rely on the next poll cycle to provide more outputs.
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index 8207f06..29a2496 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -24,6 +24,7 @@ import java.math.BigInteger;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.SerializableFunction;
/**
@@ -325,10 +326,9 @@ public class TypeDescriptors {
* @param extractor A class for specifying the type to extract from the supertype
*
* @return A {@link TypeDescriptor} for the actual value of the result type of the extractor,
- * or {@code null} if the type was erased.
+ * potentially containing unresolved type variables if the type was erased.
*/
@SuppressWarnings("unchecked")
- @Nullable
public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
T instance, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
return extractFromTypeParameters(
@@ -340,7 +340,6 @@ public class TypeDescriptors {
* {@link TypeDescriptor} of the instance being analyzed rather than the instance itself.
*/
@SuppressWarnings("unchecked")
- @Nullable
public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
// Get the type signature of the extractor, e.g.
@@ -363,19 +362,13 @@ public class TypeDescriptors {
// Get output of the extractor.
Type outputT = ((ParameterizedType) extractorT.getType()).getActualTypeArguments()[1];
- TypeDescriptor<?> res = TypeDescriptor.of(outputT);
- if (res.hasUnresolvedParameters()) {
- return null;
- } else {
- return (TypeDescriptor<V>) res;
- }
+ return (TypeDescriptor<V>) TypeDescriptor.of(outputT);
}
/**
* Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to
- * Java type erasure: returns {@code null} if the type was erased.
+ * Java type erasure: may contain unresolved type variables if the type was erased.
*/
- @Nullable
public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
SerializableFunction<InputT, OutputT> fn) {
return extractFromTypeParameters(
@@ -386,9 +379,8 @@ public class TypeDescriptors {
/**
* Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to
- * Java type erasure: returns {@code null} if the type was erased.
+ * Java type erasure: may contain unresolved type variables if the type was erased.
*/
- @Nullable
public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
SerializableFunction<InputT, OutputT> fn) {
return extractFromTypeParameters(
@@ -396,4 +388,22 @@ public class TypeDescriptors {
SerializableFunction.class,
new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {});
}
+
+ /** Like {@link #inputOf(SerializableFunction)} but for {@link Contextful.Fn}. */
+ public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
+ Contextful.Fn<InputT, OutputT> fn) {
+ return TypeDescriptors.extractFromTypeParameters(
+ fn,
+ Contextful.Fn.class,
+ new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, InputT>() {});
+ }
+
+ /** Like {@link #outputOf(SerializableFunction)} but for {@link Contextful.Fn}. */
+ public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
+ Contextful.Fn<InputT, OutputT> fn) {
+ return TypeDescriptors.extractFromTypeParameters(
+ fn,
+ Contextful.Fn.class,
+ new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, OutputT>() {});
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index 132a1ff..113e8fe 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.transforms;
+import static org.apache.beam.sdk.transforms.Requirements.requiresSideInputs;
import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput;
import static org.apache.beam.sdk.transforms.Watch.Growth.afterTotalOf;
import static org.apache.beam.sdk.transforms.Watch.Growth.allOf;
@@ -57,6 +58,7 @@ import org.apache.beam.sdk.transforms.Watch.GrowthTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
@@ -81,9 +83,10 @@ public class WatchTest implements Serializable {
Watch.growthOf(
new PollFn<String, String>() {
@Override
- public PollResult<String> apply(String input, Instant time) {
+ public PollResult<String> apply(String element, Context c)
+ throws Exception {
return PollResult.complete(
- time, Arrays.asList(input + ".foo", input + ".bar"));
+ Instant.now(), Arrays.asList(element + ".foo", element + ".bar"));
}
})
.withPollInterval(Duration.ZERO));
@@ -99,6 +102,36 @@ public class WatchTest implements Serializable {
@Test
@Category({NeedsRunner.class, UsesSplittableParDo.class})
+ public void testSinglePollMultipleInputsWithSideInput() {
+ final PCollectionView<String> moo =
+ p.apply("moo", Create.of("moo")).apply("moo singleton", View.<String>asSingleton());
+ final PCollectionView<String> zoo =
+ p.apply("zoo", Create.of("zoo")).apply("zoo singleton", View.<String>asSingleton());
+ PCollection<KV<String, String>> res =
+ p.apply("input", Create.of("a", "b"))
+ .apply(
+ Watch.growthOf(
+ new PollFn<String, String>() {
+ @Override
+ public PollResult<String> apply(String element, Context c)
+ throws Exception {
+ return PollResult.complete(
+ Instant.now(),
+ Arrays.asList(
+ element + " " + c.sideInput(moo) + " " + c.sideInput(zoo)));
+ }
+ },
+ requiresSideInputs(moo, zoo))
+ .withPollInterval(Duration.ZERO));
+
+ PAssert.that(res)
+ .containsInAnyOrder(Arrays.asList(KV.of("a", "a moo zoo"), KV.of("b", "b moo zoo")));
+
+ p.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesSplittableParDo.class})
public void testMultiplePollsWithTerminationBecauseOutputIsFinal() {
testMultiplePolls(false);
}
@@ -178,13 +211,14 @@ public class WatchTest implements Serializable {
Watch.growthOf(
new PollFn<String, KV<String, Integer>>() {
@Override
- public PollResult<KV<String, Integer>> apply(String input, Instant time) {
+ public PollResult<KV<String, Integer>> apply(String element, Context c)
+ throws Exception {
String pollId = UUID.randomUUID().toString();
List<KV<String, Integer>> output = Lists.newArrayList();
for (int i = 0; i < numResults; ++i) {
output.add(KV.of(pollId, i));
}
- return PollResult.complete(time, output);
+ return PollResult.complete(Instant.now(), output);
}
})
.withTerminationPerInput(Watch.Growth.<String>afterTotalOf(standardSeconds(1)))
@@ -291,7 +325,7 @@ public class WatchTest implements Serializable {
* Gradually emits all items from the given list, pairing each one with a UUID that identifies the
* round of polling, so a client can check how many rounds of polling there were.
*/
- private static class TimedPollFn<InputT, OutputT> implements PollFn<InputT, OutputT> {
+ private static class TimedPollFn<InputT, OutputT> extends PollFn<InputT, OutputT> {
private final Instant baseTime;
private final List<OutputT> outputs;
private final Duration timeToOutputEverything;
@@ -311,7 +345,7 @@ public class WatchTest implements Serializable {
}
@Override
- public PollResult<OutputT> apply(InputT input, Instant time) {
+ public PollResult<OutputT> apply(InputT element, Context c) throws Exception {
Instant now = Instant.now();
Duration elapsed = new Duration(baseTime, Instant.now());
if (elapsed.isLongerThan(timeToFail)) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
index a4f58da..645da5e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TypeDescriptorsTest.java
@@ -25,10 +25,12 @@ import static org.apache.beam.sdk.values.TypeDescriptors.sets;
import static org.apache.beam.sdk.values.TypeDescriptors.strings;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import java.util.List;
import java.util.Set;
+import org.hamcrest.CoreMatchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -115,8 +117,17 @@ public class TypeDescriptorsTest {
@Test
public void testTypeDescriptorsTypeParameterOfErased() throws Exception {
Generic<Integer, String> instance = TypeDescriptorsTest.typeErasedGeneric();
- assertNull(extractFooT(instance));
+
+ TypeDescriptor<Integer> fooT = extractFooT(instance);
+ assertNotNull(fooT);
+ // Using toString() assertions because verifying the contents of a Type is very cumbersome,
+ // and the expected types can not be easily constructed directly.
+ assertEquals("ActualFooT", fooT.toString());
+
assertEquals(strings(), extractBarT(instance));
- assertNull(extractKV(instance));
+
+ TypeDescriptor<KV<Integer, String>> kvT = extractKV(instance);
+ assertNotNull(kvT);
+ assertThat(kvT.toString(), CoreMatchers.containsString("KV<ActualFooT, java.lang.String>"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2771687..2f99643 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -95,7 +95,6 @@ import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
-import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Duration;
@@ -547,15 +546,12 @@ public class BigQueryIO {
return getCoder();
}
- TypeDescriptor<T> descriptor = TypeDescriptors.outputOf(getParseFn());
-
- String message =
- "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().";
- checkArgument(descriptor != null, message);
try {
- return coderRegistry.getCoder(descriptor);
+ return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
} catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(message, e);
+ throw new IllegalArgumentException(
+ "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
+ e);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4b908c2e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
index ea4fc4e..ecfc990 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinations.java
@@ -164,10 +164,6 @@ public abstract class DynamicDestinations<T, DestinationT> implements Serializab
DynamicDestinations.class,
new TypeDescriptors.TypeVariableExtractor<
DynamicDestinations<T, DestinationT>, DestinationT>() {});
- checkArgument(
- descriptor != null,
- "Unable to infer a coder for DestinationT, "
- + "please specify it explicitly by overriding getDestinationCoder()");
return registry.getCoder(descriptor);
}
}