You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2018/12/14 20:13:54 UTC
[beam] branch master updated: [BEAM-6150] Superinterface for
SerializableFunction allowing declared exceptions (#7160)
This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5ec695b [BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions (#7160)
5ec695b is described below
commit 5ec695b9397991996fb215a6f18f33e72d860e53
Author: Jeff Klukas <je...@klukas.net>
AuthorDate: Fri Dec 14 15:13:46 2018 -0500
[BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions (#7160)
* [BEAM-6150] Superinterface for SerializableFunction allowing declared exceptions
Also provides an equivalent superclass for SimpleFunction.
See https://issues.apache.org/jira/browse/BEAM-6150
* Update MapElements docs to remove Java 7 references
* Update tests to use ProcessFunction
* Include tests for both existing and new classes
* Remove Java 7 reference in ptransform style guide
---
.../org/apache/beam/sdk/transforms/Contextful.java | 4 +-
.../org/apache/beam/sdk/transforms/Filter.java | 23 +--
.../beam/sdk/transforms/FlatMapElements.java | 40 +++--
...{SimpleFunction.java => InferableFunction.java} | 49 ++++---
.../apache/beam/sdk/transforms/MapElements.java | 36 ++---
...ializableFunction.java => ProcessFunction.java} | 17 ++-
.../beam/sdk/transforms/SerializableFunction.java | 11 +-
.../apache/beam/sdk/transforms/SimpleFunction.java | 38 +----
.../org/apache/beam/sdk/transforms/ToString.java | 8 +-
.../apache/beam/sdk/values/TypeDescriptors.java | 40 ++---
.../org/apache/beam/sdk/transforms/FilterTest.java | 17 +++
.../beam/sdk/transforms/FlatMapElementsTest.java | 60 +++++++-
.../beam/sdk/transforms/MapElementsTest.java | 163 ++++++++++++++++++++-
website/src/contribute/ptransform-style-guide.md | 4 +-
14 files changed, 358 insertions(+), 152 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
index 7e788cf..97a994f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Contextful.java
@@ -104,11 +104,11 @@ public final class Contextful<ClosureT> implements Serializable {
}
/**
- * Wraps a {@link SerializableFunction} as a {@link Contextful} of {@link Fn} with empty {@link
+ * Wraps a {@link ProcessFunction} as a {@link Contextful} of {@link Fn} with empty {@link
* Requirements}.
*/
public static <InputT, OutputT> Contextful<Fn<InputT, OutputT>> fn(
- final SerializableFunction<InputT, OutputT> fn) {
+ final ProcessFunction<InputT, OutputT> fn) {
return new Contextful<>((element, c) -> fn.apply(element), Requirements.empty());
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index 4bffeb6..aa9d2cd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -32,7 +32,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
/**
* Returns a {@code PTransform} that takes an input {@code PCollection<T>} and returns a {@code
* PCollection<T>} with elements that satisfy the given predicate. The predicate must be a {@code
- * SerializableFunction<T, Boolean>}.
+ * ProcessFunction<T, Boolean>}.
*
* <p>Example of use:
*
@@ -46,7 +46,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* #greaterThanEq}, which return elements satisfying various inequalities with the specified value
* based on the elements' natural ordering.
*/
- public static <T, PredicateT extends SerializableFunction<T, Boolean>> Filter<T> by(
+ public static <T, PredicateT extends ProcessFunction<T, Boolean>> Filter<T> by(
PredicateT predicate) {
return new Filter<>(predicate);
}
@@ -71,7 +71,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> lessThan(final T value) {
- return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) < 0)
+ return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) < 0)
.described(String.format("x < %s", value));
}
@@ -95,7 +95,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> greaterThan(final T value) {
- return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) > 0)
+ return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) > 0)
.described(String.format("x > %s", value));
}
@@ -119,7 +119,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> lessThanEq(final T value) {
- return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) <= 0)
+ return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) <= 0)
.described(String.format("x ≤ %s", value));
}
@@ -143,7 +143,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> greaterThanEq(final T value) {
- return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) >= 0)
+ return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) >= 0)
.described(String.format("x ≥ %s", value));
}
@@ -166,20 +166,20 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
* <p>See also {@link #by}, which returns elements that satisfy the given predicate.
*/
public static <T extends Comparable<T>> Filter<T> equal(final T value) {
- return by((SerializableFunction<T, Boolean>) input -> input.compareTo(value) == 0)
+ return by((ProcessFunction<T, Boolean>) input -> input.compareTo(value) == 0)
.described(String.format("x == %s", value));
}
///////////////////////////////////////////////////////////////////////////////
- private SerializableFunction<T, Boolean> predicate;
+ private ProcessFunction<T, Boolean> predicate;
private String predicateDescription;
- private Filter(SerializableFunction<T, Boolean> predicate) {
+ private Filter(ProcessFunction<T, Boolean> predicate) {
this(predicate, "Filter.predicate");
}
- private Filter(SerializableFunction<T, Boolean> predicate, String predicateDescription) {
+ private Filter(ProcessFunction<T, Boolean> predicate, String predicateDescription) {
this.predicate = predicate;
this.predicateDescription = predicateDescription;
}
@@ -199,7 +199,8 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
ParDo.of(
new DoFn<T, T>() {
@ProcessElement
- public void processElement(@Element T element, OutputReceiver<T> r) {
+ public void processElement(@Element T element, OutputReceiver<T> r)
+ throws Exception {
if (predicate.apply(element)) {
r.output(element);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index edf255a..93fc85a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -52,29 +52,29 @@ public class FlatMapElements<InputT, OutputT>
}
/**
- * For a {@code SimpleFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, return a {@link
- * PTransform} that applies {@code fn} to every element of the input {@code PCollection<InputT>}
- * and outputs all of the elements to the output {@code PCollection<OutputT>}.
+ * For a {@code InferableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, return a
+ * {@link PTransform} that applies {@code fn} to every element of the input {@code
+ * PCollection<InputT>} and outputs all of the elements to the output {@code
+ * PCollection<OutputT>}.
*
- * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link
- * #via(SerializableFunction)} supports use of lambda for greater concision.
+ * <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it
+ * is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and
+ * provide the mapping as a lambda expression to {@link #via(ProcessFunction)}.
*
- * <p>Example of use in Java 7:
+ * <p>Example usage:
*
* <pre>{@code
* PCollection<String> lines = ...;
* PCollection<String> words = lines.apply(FlatMapElements.via(
- * new SimpleFunction<String, List<String>>() {
- * public Integer apply(String line) {
+ * new InferableFunction<String, List<String>>() {
+ * public Integer apply(String line) throws Exception {
* return Arrays.asList(line.split(" "));
* }
* });
* }</pre>
- *
- * <p>To use a Java 8 lambda, see {@link #via(SerializableFunction)}.
*/
public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via(
- SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
+ InferableFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
Contextful<Fn<InputT, Iterable<OutputT>>> wrapped = (Contextful) Contextful.fn(fn);
TypeDescriptor<OutputT> outputType =
TypeDescriptors.extractFromTypeParameters(
@@ -87,7 +87,7 @@ public class FlatMapElements<InputT, OutputT>
/**
* Returns a new {@link FlatMapElements} transform with the given type descriptor for the output
- * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
+ * type, but the mapping function yet to be specified using {@link #via(ProcessFunction)}.
*/
public static <OutputT> FlatMapElements<?, OutputT> into(
final TypeDescriptor<OutputT> outputType) {
@@ -95,29 +95,25 @@ public class FlatMapElements<InputT, OutputT>
}
/**
- * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, returns a
- * {@link PTransform} that applies {@code fn} to every element of the input {@code
- * PCollection<InputT>} and outputs all of the elements to the output {@code
- * PCollection<OutputT>}.
+ * For a {@code ProcessFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, returns a {@link
+ * PTransform} that applies {@code fn} to every element of the input {@code PCollection<InputT>}
+ * and outputs all of the elements to the output {@code PCollection<OutputT>}.
*
- * <p>Example of use in Java 8:
+ * <p>Example usage:
*
* <pre>{@code
* PCollection<String> words = lines.apply(
* FlatMapElements.into(TypeDescriptors.strings())
* .via((String line) -> Arrays.asList(line.split(" ")))
* }</pre>
- *
- * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
- * descriptor need not be provided.
*/
public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
- SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
+ ProcessFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
return new FlatMapElements<>(
(Contextful) Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
}
- /** Like {@link #via(SerializableFunction)}, but allows access to additional context. */
+ /** Like {@link #via(ProcessFunction)}, but allows access to additional context. */
@Experimental(Experimental.Kind.CONTEXTFUL)
public <NewInputT> FlatMapElements<NewInputT, OutputT> via(
Contextful<Fn<NewInputT, Iterable<OutputT>>> fn) {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java
similarity index 64%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java
index e3f3cc8..d9dc864 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/InferableFunction.java
@@ -24,28 +24,31 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
- * A {@link SerializableFunction} which is not a <i>functional interface</i>. Concrete subclasses
- * allow us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder
- * Coder} inference.
+ * A {@link ProcessFunction} which is not a <i>functional interface</i>. Concrete subclasses allow
+ * us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder Coder}
+ * inference.
+ *
+ * <p>See {@link SimpleFunction} for providing robust type information where a {@link
+ * SerializableFunction} is required.
*/
-public abstract class SimpleFunction<InputT, OutputT>
- implements SerializableFunction<InputT, OutputT>, HasDisplayData {
+public abstract class InferableFunction<InputT, OutputT>
+ implements ProcessFunction<InputT, OutputT>, HasDisplayData {
- @Nullable private final SerializableFunction<InputT, OutputT> fn;
+ @Nullable private final ProcessFunction<InputT, OutputT> fn;
- protected SimpleFunction() {
+ protected InferableFunction() {
this.fn = null;
// A subclass must override apply if using this constructor. Check that via
// reflection.
try {
Method methodThatMustBeOverridden =
- SimpleFunction.class.getDeclaredMethod("apply", new Class[] {Object.class});
+ InferableFunction.class.getDeclaredMethod("apply", new Class[] {Object.class});
Method methodOnSubclass = getClass().getMethod("apply", new Class[] {Object.class});
if (methodOnSubclass.equals(methodThatMustBeOverridden)) {
throw new IllegalStateException(
- "Subclass of SimpleFunction must override 'apply' method"
- + " or pass a SerializableFunction to the constructor,"
+ "Subclass of InferableFunction must override 'apply' method"
+ + " or pass a ProcessFunction to the constructor,"
+ " usually via a lambda or method reference.");
}
@@ -54,24 +57,24 @@ public abstract class SimpleFunction<InputT, OutputT>
}
}
- protected SimpleFunction(SerializableFunction<InputT, OutputT> fn) {
+ protected InferableFunction(ProcessFunction<InputT, OutputT> fn) {
this.fn = fn;
}
@Override
- public OutputT apply(InputT input) {
+ public OutputT apply(InputT input) throws Exception {
return fn.apply(input);
}
public static <InputT, OutputT>
- SimpleFunction<InputT, OutputT> fromSerializableFunctionWithOutputType(
- SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) {
- return new SimpleFunctionWithOutputType<>(fn, outputType);
+ InferableFunction<InputT, OutputT> fromProcessFunctionWithOutputType(
+ ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) {
+ return new InferableFunctionWithOutputType<>(fn, outputType);
}
/**
* Returns a {@link TypeDescriptor} capturing what is known statically about the input type of
- * this {@link SimpleFunction} instance's most-derived class.
+ * this {@link InferableFunction} instance's most-derived class.
*
* <p>See {@link #getOutputTypeDescriptor} for more discussion.
*/
@@ -81,9 +84,9 @@ public abstract class SimpleFunction<InputT, OutputT>
/**
* Returns a {@link TypeDescriptor} capturing what is known statically about the output type of
- * this {@link SimpleFunction} instance's most-derived class.
+ * this {@link InferableFunction} instance's most-derived class.
*
- * <p>In the normal case of a concrete {@link SimpleFunction} subclass with no generic type
+ * <p>In the normal case of a concrete {@link InferableFunction} subclass with no generic type
* parameters of its own (including anonymous inner classes), this will be a complete non-generic
* type, which is good for choosing a default output {@code Coder<OutputT>} for the output {@code
* PCollection<OutputT>}.
@@ -102,16 +105,16 @@ public abstract class SimpleFunction<InputT, OutputT>
public void populateDisplayData(DisplayData.Builder builder) {}
/**
- * A {@link SimpleFunction} built from a {@link SerializableFunction}, having a known output type
+ * A {@link InferableFunction} built from a {@link ProcessFunction}, having a known output type
* that is explicitly set.
*/
- private static class SimpleFunctionWithOutputType<InputT, OutputT>
- extends SimpleFunction<InputT, OutputT> {
+ private static class InferableFunctionWithOutputType<InputT, OutputT>
+ extends InferableFunction<InputT, OutputT> {
private final TypeDescriptor<OutputT> outputType;
- public SimpleFunctionWithOutputType(
- SerializableFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) {
+ public InferableFunctionWithOutputType(
+ ProcessFunction<InputT, OutputT> fn, TypeDescriptor<OutputT> outputType) {
super(fn);
this.outputType = outputType;
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 41aac41..dc73cf8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -50,64 +50,58 @@ public class MapElements<InputT, OutputT>
}
/**
- * For a {@code SimpleFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that
+ * For {@code InferableFunction<InputT, OutputT>} {@code fn}, returns a {@code PTransform} that
* takes an input {@code PCollection<InputT>} and returns a {@code PCollection<OutputT>}
* containing {@code fn.apply(v)} for every element {@code v} in the input.
*
- * <p>This overload is intended primarily for use in Java 7. In Java 8, the overload {@link
- * #via(SerializableFunction)} supports use of lambda for greater concision.
+ * <p>{@link InferableFunction} has the advantage of providing type descriptor information, but it
+ * is generally more convenient to specify output type via {@link #into(TypeDescriptor)}, and
+ * provide the mapping as a lambda expression to {@link #via(ProcessFunction)}.
*
- * <p>Example of use in Java 7:
+ * <p>Example usage:
*
* <pre>{@code
* PCollection<String> words = ...;
* PCollection<Integer> wordsPerLine = words.apply(MapElements.via(
- * new SimpleFunction<String, Integer>() {
- * public Integer apply(String word) {
+ * new InferableFunction<String, Integer>() {
+ * public Integer apply(String word) throws Exception {
* return word.length();
* }
* }));
* }</pre>
*/
public static <InputT, OutputT> MapElements<InputT, OutputT> via(
- final SimpleFunction<InputT, OutputT> fn) {
+ final InferableFunction<InputT, OutputT> fn) {
return new MapElements<>(
Contextful.fn(fn), fn, fn.getInputTypeDescriptor(), fn.getOutputTypeDescriptor());
}
/**
* Returns a new {@link MapElements} transform with the given type descriptor for the output type,
- * but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
+ * but the mapping function yet to be specified using {@link #via(ProcessFunction)}.
*/
public static <OutputT> MapElements<?, OutputT> into(final TypeDescriptor<OutputT> outputType) {
return new MapElements<>(null, null, null, outputType);
}
/**
- * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
- * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a
- * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the
- * input.
+ * For a {@code ProcessFunction<InputT, OutputT>} {@code fn} and output type descriptor, returns a
+ * {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a {@code
+ * PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the input.
*
- * <p>Example of use in Java 8:
+ * <p>Example usage:
*
* <pre>{@code
* PCollection<Integer> wordLengths = words.apply(
* MapElements.into(TypeDescriptors.integers())
* .via((String word) -> word.length()));
* }</pre>
- *
- * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
- * descriptor need not be provided.
*/
- public <NewInputT> MapElements<NewInputT, OutputT> via(
- SerializableFunction<NewInputT, OutputT> fn) {
+ public <NewInputT> MapElements<NewInputT, OutputT> via(ProcessFunction<NewInputT, OutputT> fn) {
return new MapElements<>(Contextful.fn(fn), fn, TypeDescriptors.inputOf(fn), outputType);
}
- /**
- * Like {@link #via(SerializableFunction)}, but supports access to context, such as side inputs.
- */
+ /** Like {@link #via(ProcessFunction)}, but supports access to context, such as side inputs. */
@Experimental(Kind.CONTEXTFUL)
public <NewInputT> MapElements<NewInputT, OutputT> via(Contextful<Fn<NewInputT, OutputT>> fn) {
return new MapElements<>(
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java
similarity index 51%
copy from sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
copy to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java
index b2ac9ed..b0e8807 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ProcessFunction.java
@@ -23,10 +23,23 @@ import java.io.Serializable;
* A function that computes an output value of type {@code OutputT} from an input value of type
* {@code InputT} and is {@link Serializable}.
*
+ * <p>This is the most general function type provided in this SDK, allowing arbitrary {@code
+ * Exception}s to be thrown, and matching Java's expectations of a <i>functional interface</i> that
+ * can be supplied as a lambda expression or method reference. It is named {@code ProcessFunction}
+ * because it is particularly appropriate anywhere a user needs to provide code that will eventually
+ * be executed as part of a {@link DoFn} {@link org.apache.beam.sdk.transforms.DoFn.ProcessElement
+ * ProcessElement} function, which is allowed to declare throwing {@code Exception}. If you need to
+ * execute user code in a context where arbitrary checked exceptions should not be allowed, require
+ * that users implement the subinterface {@link SerializableFunction} instead.
+ *
+ * <p>For more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, consider extending
+ * {@link InferableFunction} rather than implementing this interface directly.
+ *
* @param <InputT> input value type
* @param <OutputT> output value type
*/
-public interface SerializableFunction<InputT, OutputT> extends Serializable {
+@FunctionalInterface
+public interface ProcessFunction<InputT, OutputT> extends Serializable {
/** Returns the result of invoking this function on the given input. */
- OutputT apply(InputT input);
+ OutputT apply(InputT input) throws Exception;
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
index b2ac9ed..a1dba9e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunction.java
@@ -21,12 +21,19 @@ import java.io.Serializable;
/**
* A function that computes an output value of type {@code OutputT} from an input value of type
- * {@code InputT} and is {@link Serializable}.
+ * {@code InputT}, is {@link Serializable}, and does not allow checked exceptions to be declared.
+ *
+ * <p>To allow checked exceptions, implement the superinterface {@link ProcessFunction} instead. To
+ * allow more robust {@link org.apache.beam.sdk.coders.Coder Coder} inference, see {@link
+ * InferableFunction}.
*
* @param <InputT> input value type
* @param <OutputT> output value type
*/
-public interface SerializableFunction<InputT, OutputT> extends Serializable {
+@FunctionalInterface
+public interface SerializableFunction<InputT, OutputT>
+ extends ProcessFunction<InputT, OutputT>, Serializable {
/** Returns the result of invoking this function on the given input. */
+ @Override
OutputT apply(InputT input);
}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
index e3f3cc8..0e19272 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
@@ -19,8 +19,6 @@ package org.apache.beam.sdk.transforms;
import java.lang.reflect.Method;
import javax.annotation.Nullable;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
@@ -28,8 +26,8 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* allow us to infer type information, which in turn aids {@link org.apache.beam.sdk.coders.Coder
* Coder} inference.
*/
-public abstract class SimpleFunction<InputT, OutputT>
- implements SerializableFunction<InputT, OutputT>, HasDisplayData {
+public abstract class SimpleFunction<InputT, OutputT> extends InferableFunction<InputT, OutputT>
+ implements SerializableFunction<InputT, OutputT> {
@Nullable private final SerializableFunction<InputT, OutputT> fn;
@@ -70,38 +68,6 @@ public abstract class SimpleFunction<InputT, OutputT>
}
/**
- * Returns a {@link TypeDescriptor} capturing what is known statically about the input type of
- * this {@link SimpleFunction} instance's most-derived class.
- *
- * <p>See {@link #getOutputTypeDescriptor} for more discussion.
- */
- public TypeDescriptor<InputT> getInputTypeDescriptor() {
- return new TypeDescriptor<InputT>(this) {};
- }
-
- /**
- * Returns a {@link TypeDescriptor} capturing what is known statically about the output type of
- * this {@link SimpleFunction} instance's most-derived class.
- *
- * <p>In the normal case of a concrete {@link SimpleFunction} subclass with no generic type
- * parameters of its own (including anonymous inner classes), this will be a complete non-generic
- * type, which is good for choosing a default output {@code Coder<OutputT>} for the output {@code
- * PCollection<OutputT>}.
- */
- public TypeDescriptor<OutputT> getOutputTypeDescriptor() {
- return new TypeDescriptor<OutputT>(this) {};
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>By default, does not register any display data. Implementors may override this method to
- * provide their own display data.
- */
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {}
-
- /**
* A {@link SimpleFunction} built from a {@link SerializableFunction}, having a known output type
* that is explicitly set.
*/
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
index 5f214f3..ad11cd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java
@@ -27,7 +27,7 @@ import org.apache.beam.sdk.values.PCollection;
* PCollection<Iterable<?>>} to a {@link PCollection PCollection<String>}.
*
* <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
- * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
*/
public final class ToString {
private ToString() {
@@ -96,7 +96,7 @@ public final class ToString {
* }</pre>
*
* <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
- * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
*/
private static final class Elements extends PTransform<PCollection<?>, PCollection<String>> {
@Override
@@ -125,7 +125,7 @@ public final class ToString {
* }</pre>
*
* <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
- * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
*/
private static final class KVs
extends PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> {
@@ -160,7 +160,7 @@ public final class ToString {
* }</pre>
*
* <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own
- * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)}
+ * {@link ProcessFunction} using {@link MapElements#via(ProcessFunction)}
*/
private static final class Iterables
extends PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> {
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
index d125356..e605f7c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TypeDescriptors.java
@@ -25,7 +25,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.beam.sdk.transforms.Contextful;
-import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.ProcessFunction;
/**
* A utility class for creating {@link TypeDescriptor} objects for different types, such as Java
@@ -339,16 +339,16 @@ public class TypeDescriptors {
*
* <pre>{@code
* class Foo<BarT> {
- * private SerializableFunction<BarT, String> fn;
+ * private ProcessFunction<BarT, String> fn;
*
* TypeDescriptor<BarT> inferBarTypeDescriptorFromFn() {
* return TypeDescriptors.extractFromTypeParameters(
* fn,
- * SerializableFunction.class,
+ * ProcessFunction.class,
* // The actual type of "fn" is matched against the input type of the extractor,
* // and the obtained values of type variables of the superclass are substituted
* // into the output type of the extractor.
- * new TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>() {});
+ * new TypeVariableExtractor<ProcessFunction<BarT, String>, BarT>() {});
* }
* }
* }</pre>
@@ -374,20 +374,20 @@ public class TypeDescriptors {
public static <T, V> TypeDescriptor<V> extractFromTypeParameters(
TypeDescriptor<T> type, Class<? super T> supertype, TypeVariableExtractor<T, V> extractor) {
// Get the type signature of the extractor, e.g.
- // TypeVariableExtractor<SerializableFunction<BarT, String>, BarT>
+ // TypeVariableExtractor<ProcessFunction<BarT, String>, BarT>
TypeDescriptor<TypeVariableExtractor<T, V>> extractorSupertype =
(TypeDescriptor<TypeVariableExtractor<T, V>>)
TypeDescriptor.of(extractor.getClass()).getSupertype(TypeVariableExtractor.class);
- // Get the actual type argument, e.g. SerializableFunction<BarT, String>
+ // Get the actual type argument, e.g. ProcessFunction<BarT, String>
Type inputT = ((ParameterizedType) extractorSupertype.getType()).getActualTypeArguments()[0];
// Get the actual supertype of the type being analyzed, hopefully with all type parameters
- // resolved, e.g. SerializableFunction<Integer, String>
+ // resolved, e.g. ProcessFunction<Integer, String>
TypeDescriptor supertypeDescriptor = type.getSupertype(supertype);
// Substitute actual supertype into the extractor, e.g.
- // TypeVariableExtractor<SerializableFunction<Integer, String>, Integer>
+ // TypeVariableExtractor<ProcessFunction<Integer, String>, Integer>
TypeDescriptor<TypeVariableExtractor<T, V>> extractorT =
extractorSupertype.where(inputT, supertypeDescriptor.getType());
@@ -397,30 +397,30 @@ public class TypeDescriptors {
}
/**
- * Returns a type descriptor for the input of the given {@link SerializableFunction}, subject to
- * Java type erasure: may contain unresolved type variables if the type was erased.
+ * Returns a type descriptor for the input of the given {@link ProcessFunction}, subject to Java
+ * type erasure: may contain unresolved type variables if the type was erased.
*/
public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
- SerializableFunction<InputT, OutputT> fn) {
+ ProcessFunction<InputT, OutputT> fn) {
return extractFromTypeParameters(
fn,
- SerializableFunction.class,
- new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, InputT>() {});
+ ProcessFunction.class,
+ new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, InputT>() {});
}
/**
- * Returns a type descriptor for the output of the given {@link SerializableFunction}, subject to
- * Java type erasure: may contain unresolved type variables if the type was erased.
+ * Returns a type descriptor for the output of the given {@link ProcessFunction}, subject to Java
+ * type erasure: may contain unresolved type variables if the type was erased.
*/
public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
- SerializableFunction<InputT, OutputT> fn) {
+ ProcessFunction<InputT, OutputT> fn) {
return extractFromTypeParameters(
fn,
- SerializableFunction.class,
- new TypeVariableExtractor<SerializableFunction<InputT, OutputT>, OutputT>() {});
+ ProcessFunction.class,
+ new TypeVariableExtractor<ProcessFunction<InputT, OutputT>, OutputT>() {});
}
- /** Like {@link #inputOf(SerializableFunction)} but for {@link Contextful.Fn}. */
+ /** Like {@link #inputOf(ProcessFunction)} but for {@link Contextful.Fn}. */
public static <InputT, OutputT> TypeDescriptor<InputT> inputOf(
Contextful.Fn<InputT, OutputT> fn) {
return TypeDescriptors.extractFromTypeParameters(
@@ -429,7 +429,7 @@ public class TypeDescriptors {
new TypeDescriptors.TypeVariableExtractor<Contextful.Fn<InputT, OutputT>, InputT>() {});
}
- /** Like {@link #outputOf(SerializableFunction)} but for {@link Contextful.Fn}. */
+ /** Like {@link #outputOf(ProcessFunction)} but for {@link Contextful.Fn}. */
public static <InputT, OutputT> TypeDescriptor<OutputT> outputOf(
Contextful.Fn<InputT, OutputT> fn) {
return TypeDescriptors.extractFromTypeParameters(
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
index a05b7eb..091c28f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FilterTest.java
@@ -58,6 +58,13 @@ public class FilterTest implements Serializable {
}
}
+ static class EvenProcessFn implements ProcessFunction<Integer, Boolean> {
+ @Override
+ public Boolean apply(Integer elem) throws Exception {
+ return elem % 2 == 0;
+ }
+ }
+
@Rule public final TestPipeline p = TestPipeline.create();
@Rule public transient ExpectedException thrown = ExpectedException.none();
@@ -95,6 +102,16 @@ public class FilterTest implements Serializable {
@Test
@Category(NeedsRunner.class)
+ public void testFilterByProcessFunction() {
+ PCollection<Integer> output =
+ p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.by(new EvenProcessFn()));
+
+ PAssert.that(output).containsInAnyOrder(2, 4, 6);
+ p.run();
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
public void testFilterLessThan() {
PCollection<Integer> output = p.apply(Create.of(1, 2, 3, 4, 5, 6, 7)).apply(Filter.lessThan(4));
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
index 061765a..a927bf9 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsTest.java
@@ -58,12 +58,12 @@ public class FlatMapElementsTest implements Serializable {
/** Basic test of {@link FlatMapElements} with a {@link SimpleFunction}. */
@Test
@Category(NeedsRunner.class)
- public void testFlatMapBasic() throws Exception {
+ public void testFlatMapSimpleFunction() throws Exception {
PCollection<Integer> output =
pipeline
.apply(Create.of(1, 2, 3))
- // Note that FlatMapElements takes a SimpleFunction<InputT, ? extends Iterable<OutputT>>
+ // Note that FlatMapElements takes an InferableFunction<InputT, ? extends Iterable<OutputT>>
// so the use of List<Integer> here (as opposed to Iterable<Integer>) deliberately exercises
// the use of an upper bound.
.apply(
@@ -79,6 +79,26 @@ public class FlatMapElementsTest implements Serializable {
pipeline.run();
}
+ /** Basic test of {@link FlatMapElements} with an {@link InferableFunction}. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testFlatMapInferableFunction() throws Exception {
+ PCollection<Integer> output =
+ pipeline
+ .apply(Create.of(1, 2, 3))
+ .apply(
+ FlatMapElements.via(
+ new InferableFunction<Integer, List<Integer>>() {
+ @Override
+ public List<Integer> apply(Integer input) throws Exception {
+ return ImmutableList.of(-input, input);
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder(1, -2, -1, -3, 2, 3);
+ pipeline.run();
+ }
+
/** Basic test of {@link FlatMapElements} with a {@link Fn} and a side input. */
@Test
@Category(NeedsRunner.class)
@@ -182,6 +202,20 @@ public class FlatMapElementsTest implements Serializable {
}
@Test
+ public void testInferableFunctionClassDisplayData() {
+ InferableFunction<Integer, List<Integer>> inferableFn =
+ new InferableFunction<Integer, List<Integer>>() {
+ @Override
+ public List<Integer> apply(Integer input) {
+ return Collections.emptyList();
+ }
+ };
+
+ FlatMapElements<?, ?> inferableMap = FlatMapElements.via(inferableFn);
+ assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass()));
+ }
+
+ @Test
public void testSimpleFunctionDisplayData() {
SimpleFunction<Integer, List<Integer>> simpleFn =
new SimpleFunction<Integer, List<Integer>>() {
@@ -202,6 +236,26 @@ public class FlatMapElementsTest implements Serializable {
}
@Test
+ public void testInferableFunctionDisplayData() {
+ InferableFunction<Integer, List<Integer>> inferableFn =
+ new InferableFunction<Integer, List<Integer>>() {
+ @Override
+ public List<Integer> apply(Integer input) {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("foo", "baz"));
+ }
+ };
+
+ FlatMapElements<?, ?> inferableFlatMap = FlatMapElements.via(inferableFn);
+ assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("class", inferableFn.getClass()));
+ assertThat(DisplayData.from(inferableFlatMap), hasDisplayItem("foo", "baz"));
+ }
+
+ @Test
@Category(NeedsRunner.class)
public void testVoidValues() throws Exception {
pipeline
@@ -230,7 +284,7 @@ public class FlatMapElementsTest implements Serializable {
/**
* Basic test of {@link FlatMapElements} with a lambda (which is instantiated as a {@link
- * SerializableFunction}).
+ * ProcessFunction}).
*/
@Test
@Category(NeedsRunner.class)
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index c10b06e..952885a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -76,10 +76,33 @@ public class MapElementsTest implements Serializable {
}
}
+ /**
+ * An {@link InferableFunction} to test that the coder registry can propagate coders that are
+ * bound to type variables.
+ */
+ private static class PolymorphicInferableFunction<T> extends InferableFunction<T, T> {
+ @Override
+ public T apply(T input) throws Exception {
+ return input;
+ }
+ }
+
+ /**
+ * An {@link InferableFunction} to test that the coder registry can propagate coders that are
+ * bound to type variables, when the variable appears nested in the output.
+ */
+ private static class NestedPolymorphicInferableFunction<T>
+ extends InferableFunction<T, KV<T, String>> {
+ @Override
+ public KV<T, String> apply(T input) throws Exception {
+ return KV.of(input, "hello");
+ }
+ }
+
/** Basic test of {@link MapElements} with a {@link SimpleFunction}. */
@Test
@Category(NeedsRunner.class)
- public void testMapBasic() throws Exception {
+ public void testMapSimpleFunction() throws Exception {
PCollection<Integer> output =
pipeline
.apply(Create.of(1, 2, 3))
@@ -96,6 +119,26 @@ public class MapElementsTest implements Serializable {
pipeline.run();
}
+ /** Basic test of {@link MapElements} with an {@link InferableFunction}. */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testMapInferableFunction() throws Exception {
+ PCollection<Integer> output =
+ pipeline
+ .apply(Create.of(1, 2, 3))
+ .apply(
+ MapElements.via(
+ new InferableFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) throws Exception {
+ return -input;
+ }
+ }));
+
+ PAssert.that(output).containsInAnyOrder(-2, -1, -3);
+ pipeline.run();
+ }
+
/** Basic test of {@link MapElements} with a {@link Fn} and a side input. */
@Test
@Category(NeedsRunner.class)
@@ -140,6 +183,28 @@ public class MapElementsTest implements Serializable {
}
/**
+ * Basic test of {@link MapElements} coder propagation with a parametric {@link
+ * InferableFunction}.
+ */
+ @Test
+ public void testPolymorphicInferableFunction() throws Exception {
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ pipeline
+ .apply(Create.of(1, 2, 3))
+ .apply("Polymorphic Identity", MapElements.via(new PolymorphicInferableFunction<>()))
+ .apply(
+ "Test Consumer",
+ MapElements.via(
+ new InferableFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) throws Exception {
+ return input;
+ }
+ }));
+ }
+
+ /**
* Test of {@link MapElements} coder propagation with a parametric {@link SimpleFunction} where
* the type variable occurs nested within other concrete type constructors.
*/
@@ -166,12 +231,31 @@ public class MapElementsTest implements Serializable {
}
/**
- * Basic test of {@link MapElements} with a {@link SerializableFunction}. This style is generally
- * discouraged in Java 7, in favor of {@link SimpleFunction}.
+ * Test of {@link MapElements} coder propagation with a parametric {@link InferableFunction} where
+ * the type variable occurs nested within other concrete type constructors.
*/
@Test
+ public void testNestedPolymorphicInferableFunction() throws Exception {
+ pipeline.enableAbandonedNodeEnforcement(false);
+
+ pipeline
+ .apply(Create.of(1, 2, 3))
+ .apply("Polymorphic Identity", MapElements.via(new NestedPolymorphicInferableFunction<>()))
+ .apply(
+ "Test Consumer",
+ MapElements.via(
+ new InferableFunction<KV<Integer, String>, Integer>() {
+ @Override
+ public Integer apply(KV<Integer, String> input) throws Exception {
+ return 42;
+ }
+ }));
+ }
+
+ /** Basic test of {@link MapElements} with a {@link ProcessFunction}. */
+ @Test
@Category(NeedsRunner.class)
- public void testMapBasicSerializableFunction() throws Exception {
+ public void testMapBasicProcessFunction() throws Exception {
PCollection<Integer> output =
pipeline.apply(Create.of(1, 2, 3)).apply(MapElements.into(integers()).via(input -> -input));
@@ -208,6 +292,35 @@ public class MapElementsTest implements Serializable {
pipeline.run();
}
+ /**
+ * Tests that when built with a concrete subclass of {@link InferableFunction}, the type
+ * descriptor of the output reflects its static type.
+ */
+ @Test
+ @Category(NeedsRunner.class)
+ public void testInferableFunctionOutputTypeDescriptor() throws Exception {
+ PCollection<String> output =
+ pipeline
+ .apply(Create.of("hello"))
+ .apply(
+ MapElements.via(
+ new InferableFunction<String, String>() {
+ @Override
+ public String apply(String input) throws Exception {
+ return input;
+ }
+ }));
+ assertThat(
+ output.getTypeDescriptor(),
+ equalTo((TypeDescriptor<String>) new TypeDescriptor<String>() {}));
+ assertThat(
+ pipeline.getCoderRegistry().getCoder(output.getTypeDescriptor()),
+ equalTo(pipeline.getCoderRegistry().getCoder(new TypeDescriptor<String>() {})));
+
+ // Make sure the pipeline runs too
+ pipeline.run();
+ }
+
@Test
@Category(NeedsRunner.class)
public void testVoidValues() throws Exception {
@@ -229,6 +342,14 @@ public class MapElementsTest implements Serializable {
}
@Test
+ public void testProcessFunctionDisplayData() {
+ ProcessFunction<Integer, Integer> processFn = input -> input;
+
+ MapElements<?, ?> processMap = MapElements.into(integers()).via(processFn);
+ assertThat(DisplayData.from(processMap), hasDisplayItem("class", processFn.getClass()));
+ }
+
+ @Test
public void testSimpleFunctionClassDisplayData() {
SimpleFunction<?, ?> simpleFn =
new SimpleFunction<Integer, Integer>() {
@@ -243,6 +364,20 @@ public class MapElementsTest implements Serializable {
}
@Test
+ public void testInferableFunctionClassDisplayData() {
+ InferableFunction<?, ?> inferableFn =
+ new InferableFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) throws Exception {
+ return input;
+ }
+ };
+
+ MapElements<?, ?> inferableMap = MapElements.via(inferableFn);
+ assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass()));
+ }
+
+ @Test
public void testSimpleFunctionDisplayData() {
SimpleFunction<Integer, ?> simpleFn =
new SimpleFunction<Integer, Integer>() {
@@ -263,6 +398,26 @@ public class MapElementsTest implements Serializable {
}
@Test
+ public void testInferableFunctionDisplayData() {
+ InferableFunction<Integer, ?> inferableFn =
+ new InferableFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) {
+ return input;
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(DisplayData.item("foo", "baz"));
+ }
+ };
+
+ MapElements<?, ?> inferableMap = MapElements.via(inferableFn);
+ assertThat(DisplayData.from(inferableMap), hasDisplayItem("class", inferableFn.getClass()));
+ assertThat(DisplayData.from(inferableMap), hasDisplayItem("foo", "baz"));
+ }
+
+ @Test
@Category(ValidatesRunner.class)
public void testPrimitiveDisplayData() {
SimpleFunction<Integer, ?> mapFn =
diff --git a/website/src/contribute/ptransform-style-guide.md b/website/src/contribute/ptransform-style-guide.md
index 4cdcb7b..d07529c 100644
--- a/website/src/contribute/ptransform-style-guide.md
+++ b/website/src/contribute/ptransform-style-guide.md
@@ -395,8 +395,8 @@ If the transform has an aspect of behavior to be customized by a user's code, ma
Do:
-* If possible, just use PTransform composition as an extensibility device - i.e. if the same effect can be achieved by the user applying the transform in their pipeline and composing it with another `PTransform`, then the transform itself should not be extensible. E.g., a transform that writes JSON objects to a third-party system should take a `PCollection<JsonObject>` (assuming it is possible to provide a `Coder` for `JsonObject`), rather than taking a generic `PCollection<T>` and a `Se [...]
-* If extensibility by user code is necessary inside the transform, pass the user code as a `SerializableFunction` or define your own serializable function-like type (ideally single-method, for interoperability with Java 8 lambdas). Because Java erases the types of lambdas, you should be sure to have adequate type information even if a raw-type `SerializableFunction` is provided by the user. See `MapElements` and `FlatMapElements` for examples of how to use `SimpleFunction` and `Serializa [...]
+* If possible, just use PTransform composition as an extensibility device - i.e. if the same effect can be achieved by the user applying the transform in their pipeline and composing it with another `PTransform`, then the transform itself should not be extensible. E.g., a transform that writes JSON objects to a third-party system should take a `PCollection<JsonObject>` (assuming it is possible to provide a `Coder` for `JsonObject`), rather than taking a generic `PCollection<T>` and a `Pr [...]
+* If extensibility by user code is necessary inside the transform, pass the user code as a `ProcessFunction` or define your own serializable function-like type (ideally single-method, for interoperability with Java 8 lambdas). Because Java erases the types of lambdas, you should be sure to have adequate type information even if a raw-type `ProcessFunction` is provided by the user. See `MapElements` and `FlatMapElements` for examples of how to use `ProcessFunction` and `InferableFunction` [...]
Do not: