You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:14 UTC

[43/50] [abbrv] beam git commit: Removes FlatMapElements.MissingOutputTypeDescriptor

Removes FlatMapElements.MissingOutputTypeDescriptor

This comes from changing FlatMapElements.via(fn).withOutputType(td)
to FlatMapElements.into(td).via(fn) which is also shorter.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/831b11fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/831b11fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/831b11fb

Branch: refs/heads/DSL_SQL
Commit: 831b11fb712049ce78965ec2e677f9cf9ea66fc4
Parents: 3bf6c6b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:56:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:35:05 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/MinimalWordCountJava8.java    |   5 +-
 .../examples/MinimalWordCountJava8Test.java     |   5 +-
 .../beam/sdk/transforms/FlatMapElements.java    | 113 +++++++++----------
 .../transforms/FlatMapElementsJava8Test.java    |  10 +-
 4 files changed, 63 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 0072886..f424a7b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -56,8 +56,9 @@ public class MinimalWordCountJava8 {
     Pipeline p = Pipeline.create(options);
 
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
-     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
-         .withOutputType(TypeDescriptors.strings()))
+     .apply(FlatMapElements
+         .into(TypeDescriptors.strings())
+         .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements

http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 911ccf6..6c66d8f 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -63,8 +63,9 @@ public class MinimalWordCountJava8Test implements Serializable {
     p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
 
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
-     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
-         .withOutputType(TypeDescriptors.strings()))
+     .apply(FlatMapElements
+         .into(TypeDescriptors.strings())
+         .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements

http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index c165f7f..0983165 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.lang.reflect.ParameterizedType;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -30,32 +33,29 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 public class FlatMapElements<InputT, OutputT>
 extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   /**
-   * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
-   * returns a {@link PTransform} that applies {@code fn} to every element of the input
-   * {@code PCollection<InputT>} and outputs all of the elements to the output
-   * {@code PCollection<OutputT>}.
-   *
-   * <p>Example of use in Java 8:
-   * <pre>{@code
-   * PCollection<String> words = lines.apply(
-   *     FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
-   *         .withOutputType(new TypeDescriptor<String>(){});
-   * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
-   * descriptor need not be provided.
+   * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
+   * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
+   * if constructed using {@link #via(SimpleFunction)} directly.
+   */
+  @Nullable
+  private final transient TypeDescriptor<Iterable<OutputT>> outputType;
+
+  /**
+   * Non-null on a fully specified transform - is null only when constructed using {@link
+   * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
    */
-  public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
+  @Nullable
+  private final SimpleFunction<InputT, Iterable<OutputT>> fn;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
-    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
-    // covariance and contravariance in Java, so instead we cast it to an invariant
-    // function here.
-    @SuppressWarnings("unchecked") // safe covariant cast
-    SerializableFunction<InputT, Iterable<OutputT>> simplerFn =
-        (SerializableFunction<InputT, Iterable<OutputT>>) fn;
+  private FlatMapElements(
+      @Nullable SimpleFunction<InputT, Iterable<OutputT>> fn,
+      @Nullable TypeDescriptor<Iterable<OutputT>> outputType,
+      @Nullable Class<?> fnClass) {
+    this.fn = fn;
+    this.outputType = outputType;
+    this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
 
-    return new MissingOutputTypeDescriptor<>(simplerFn);
   }
 
   /**
@@ -82,54 +82,45 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
    */
   public static <InputT, OutputT> FlatMapElements<InputT, OutputT>
   via(SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
-    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
-    // covariance and contravariance in Java, so instead we cast it to an invariant
-    // function here.
-    @SuppressWarnings("unchecked") // safe covariant cast
-    SimpleFunction<InputT, Iterable<OutputT>> simplerFn =
-        (SimpleFunction<InputT, Iterable<OutputT>>) fn;
-
-    return new FlatMapElements<>(simplerFn, fn.getClass());
+    return new FlatMapElements(fn, null, fn.getClass());
   }
 
   /**
-   * An intermediate builder for a {@link FlatMapElements} transform. To complete the transform,
-   * provide an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
-   * {@link #via(SerializableFunction)} for a full example of use.
+   * Returns a new {@link FlatMapElements} transform with the given type descriptor for the output
+   * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
    */
-  public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
-    private final SerializableFunction<InputT, Iterable<OutputT>> fn;
-
-    private MissingOutputTypeDescriptor(
-        SerializableFunction<InputT, Iterable<OutputT>> fn) {
-      this.fn = fn;
-    }
-
-    public FlatMapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
-      TypeDescriptor<Iterable<OutputT>> iterableOutputType = TypeDescriptors.iterables(outputType);
-
-      return new FlatMapElements<>(
-          SimpleFunction.fromSerializableFunctionWithOutputType(fn,
-              iterableOutputType),
-              fn.getClass());
-    }
+  public static <OutputT> FlatMapElements<?, OutputT>
+  into(final TypeDescriptor<OutputT> outputType) {
+    return new FlatMapElements<>(null, TypeDescriptors.iterables(outputType), null);
   }
 
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn;
-  private final DisplayData.ItemSpec<?> fnClassDisplayData;
-
-  private FlatMapElements(
-      SimpleFunction<InputT, ? extends Iterable<OutputT>> fn,
-      Class<?> fnClass) {
-    this.fn = fn;
-    this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
+  /**
+   * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
+   * returns a {@link PTransform} that applies {@code fn} to every element of the input
+   * {@code PCollection<InputT>} and outputs all of the elements to the output
+   * {@code PCollection<OutputT>}.
+   *
+   * <p>Example of use in Java 8:
+   * <pre>{@code
+   * PCollection<String> words = lines.apply(
+   *     FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
+   *         .withOutputType(new TypeDescriptor<String>(){});
+   * }</pre>
+   *
+   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
+   * descriptor need not be provided.
+   */
+  public <NewInputT> FlatMapElements<NewInputT, OutputT>
+  via(SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
+    return new FlatMapElements(
+        SimpleFunction.fromSerializableFunctionWithOutputType(fn, (TypeDescriptor) outputType),
+        null,
+        fn.getClass());
   }
 
   @Override
   public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+    checkNotNull(fn, "Must specify a function on FlatMapElements using .via()");
     return input.apply(
         "FlatMap",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
index 471724d..501b0d1 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -52,8 +52,8 @@ public class FlatMapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(FlatMapElements
             // Note that the input type annotation is required.
-            .via((Integer i) -> ImmutableList.of(i, -i))
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via((Integer i) -> ImmutableList.of(i, -i)));
 
     PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
     pipeline.run();
@@ -69,8 +69,8 @@ public class FlatMapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(FlatMapElements
             // Note that the input type annotation is required.
-            .via(new Negater()::numAndNegation)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via(new Negater()::numAndNegation));
 
     PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
     pipeline.run();