You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:14 UTC
[43/50] [abbrv] beam git commit: Removes
FlatMapElements.MissingOutputTypeDescriptor
Removes FlatMapElements.MissingOutputTypeDescriptor
This comes from changing FlatMapElements.via(fn).withOutputType(td)
to FlatMapElements.into(td).via(fn) which is also shorter.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/831b11fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/831b11fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/831b11fb
Branch: refs/heads/DSL_SQL
Commit: 831b11fb712049ce78965ec2e677f9cf9ea66fc4
Parents: 3bf6c6b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:56:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:35:05 2017 -0700
----------------------------------------------------------------------
.../beam/examples/MinimalWordCountJava8.java | 5 +-
.../examples/MinimalWordCountJava8Test.java | 5 +-
.../beam/sdk/transforms/FlatMapElements.java | 113 +++++++++----------
.../transforms/FlatMapElementsJava8Test.java | 10 +-
4 files changed, 63 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 0072886..f424a7b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -56,8 +56,9 @@ public class MinimalWordCountJava8 {
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
- .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
- .withOutputType(TypeDescriptors.strings()))
+ .apply(FlatMapElements
+ .into(TypeDescriptors.strings())
+ .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 911ccf6..6c66d8f 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -63,8 +63,9 @@ public class MinimalWordCountJava8Test implements Serializable {
p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
- .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
- .withOutputType(TypeDescriptors.strings()))
+ .apply(FlatMapElements
+ .into(TypeDescriptors.strings())
+ .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index c165f7f..0983165 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -17,7 +17,10 @@
*/
package org.apache.beam.sdk.transforms;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import java.lang.reflect.ParameterizedType;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -30,32 +33,29 @@ import org.apache.beam.sdk.values.TypeDescriptors;
public class FlatMapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
/**
- * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
- * returns a {@link PTransform} that applies {@code fn} to every element of the input
- * {@code PCollection<InputT>} and outputs all of the elements to the output
- * {@code PCollection<OutputT>}.
- *
- * <p>Example of use in Java 8:
- * <pre>{@code
- * PCollection<String> words = lines.apply(
- * FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
- * .withOutputType(new TypeDescriptor<String>(){});
- * }</pre>
- *
- * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
- * descriptor need not be provided.
+ * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
+ * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
+ * if constructed using {@link #via(SimpleFunction)} directly.
+ */
+ @Nullable
+ private final transient TypeDescriptor<Iterable<OutputT>> outputType;
+
+ /**
+ * Non-null on a fully specified transform - is null only when constructed using {@link
+ * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
*/
- public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
- via(SerializableFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
+ @Nullable
+ private final SimpleFunction<InputT, Iterable<OutputT>> fn;
+ private final DisplayData.ItemSpec<?> fnClassDisplayData;
- // TypeDescriptor interacts poorly with the wildcards needed to correctly express
- // covariance and contravariance in Java, so instead we cast it to an invariant
- // function here.
- @SuppressWarnings("unchecked") // safe covariant cast
- SerializableFunction<InputT, Iterable<OutputT>> simplerFn =
- (SerializableFunction<InputT, Iterable<OutputT>>) fn;
+ private FlatMapElements(
+ @Nullable SimpleFunction<InputT, Iterable<OutputT>> fn,
+ @Nullable TypeDescriptor<Iterable<OutputT>> outputType,
+ @Nullable Class<?> fnClass) {
+ this.fn = fn;
+ this.outputType = outputType;
+ this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
- return new MissingOutputTypeDescriptor<>(simplerFn);
}
/**
@@ -82,54 +82,45 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public static <InputT, OutputT> FlatMapElements<InputT, OutputT>
via(SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
- // TypeDescriptor interacts poorly with the wildcards needed to correctly express
- // covariance and contravariance in Java, so instead we cast it to an invariant
- // function here.
- @SuppressWarnings("unchecked") // safe covariant cast
- SimpleFunction<InputT, Iterable<OutputT>> simplerFn =
- (SimpleFunction<InputT, Iterable<OutputT>>) fn;
-
- return new FlatMapElements<>(simplerFn, fn.getClass());
+ return new FlatMapElements(fn, null, fn.getClass());
}
/**
- * An intermediate builder for a {@link FlatMapElements} transform. To complete the transform,
- * provide an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
- * {@link #via(SerializableFunction)} for a full example of use.
+ * Returns a new {@link FlatMapElements} transform with the given type descriptor for the output
+ * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
*/
- public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
- private final SerializableFunction<InputT, Iterable<OutputT>> fn;
-
- private MissingOutputTypeDescriptor(
- SerializableFunction<InputT, Iterable<OutputT>> fn) {
- this.fn = fn;
- }
-
- public FlatMapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
- TypeDescriptor<Iterable<OutputT>> iterableOutputType = TypeDescriptors.iterables(outputType);
-
- return new FlatMapElements<>(
- SimpleFunction.fromSerializableFunctionWithOutputType(fn,
- iterableOutputType),
- fn.getClass());
- }
+ public static <OutputT> FlatMapElements<?, OutputT>
+ into(final TypeDescriptor<OutputT> outputType) {
+ return new FlatMapElements<>(null, TypeDescriptors.iterables(outputType), null);
}
- //////////////////////////////////////////////////////////////////////////////////////////////////
-
- private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn;
- private final DisplayData.ItemSpec<?> fnClassDisplayData;
-
- private FlatMapElements(
- SimpleFunction<InputT, ? extends Iterable<OutputT>> fn,
- Class<?> fnClass) {
- this.fn = fn;
- this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
+ /**
+ * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
+ * returns a {@link PTransform} that applies {@code fn} to every element of the input
+ * {@code PCollection<InputT>} and outputs all of the elements to the output
+ * {@code PCollection<OutputT>}.
+ *
+ * <p>Example of use in Java 8:
+ * <pre>{@code
+ * PCollection<String> words = lines.apply(
+ * FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
+ * .withOutputType(new TypeDescriptor<String>(){});
+ * }</pre>
+ *
+ * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
+ * descriptor need not be provided.
+ */
+ public <NewInputT> FlatMapElements<NewInputT, OutputT>
+ via(SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
+ return new FlatMapElements(
+ SimpleFunction.fromSerializableFunctionWithOutputType(fn, (TypeDescriptor) outputType),
+ null,
+ fn.getClass());
}
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+ checkNotNull(fn, "Must specify a function on FlatMapElements using .via()");
return input.apply(
"FlatMap",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
index 471724d..501b0d1 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -52,8 +52,8 @@ public class FlatMapElementsJava8Test implements Serializable {
.apply(Create.of(1, 2, 3))
.apply(FlatMapElements
// Note that the input type annotation is required.
- .via((Integer i) -> ImmutableList.of(i, -i))
- .withOutputType(new TypeDescriptor<Integer>() {}));
+ .into(TypeDescriptors.integers())
+ .via((Integer i) -> ImmutableList.of(i, -i)));
PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
pipeline.run();
@@ -69,8 +69,8 @@ public class FlatMapElementsJava8Test implements Serializable {
.apply(Create.of(1, 2, 3))
.apply(FlatMapElements
// Note that the input type annotation is required.
- .via(new Negater()::numAndNegation)
- .withOutputType(new TypeDescriptor<Integer>() {}));
+ .into(TypeDescriptors.integers())
+ .via(new Negater()::numAndNegation));
PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
pipeline.run();