You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/12 18:41:45 UTC

[1/3] beam git commit: Removes FlatMapElements.MissingOutputTypeDescriptor

Repository: beam
Updated Branches:
  refs/heads/master f31435403 -> b7ddab281


Removes FlatMapElements.MissingOutputTypeDescriptor

This comes from changing FlatMapElements.via(fn).withOutputType(td)
to FlatMapElements.into(td).via(fn) which is also shorter.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/831b11fb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/831b11fb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/831b11fb

Branch: refs/heads/master
Commit: 831b11fb712049ce78965ec2e677f9cf9ea66fc4
Parents: 3bf6c6b
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:56:00 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:35:05 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/MinimalWordCountJava8.java    |   5 +-
 .../examples/MinimalWordCountJava8Test.java     |   5 +-
 .../beam/sdk/transforms/FlatMapElements.java    | 113 +++++++++----------
 .../transforms/FlatMapElementsJava8Test.java    |  10 +-
 4 files changed, 63 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 0072886..f424a7b 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -56,8 +56,9 @@ public class MinimalWordCountJava8 {
     Pipeline p = Pipeline.create(options);
 
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
-     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
-         .withOutputType(TypeDescriptors.strings()))
+     .apply(FlatMapElements
+         .into(TypeDescriptors.strings())
+         .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements

http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 911ccf6..6c66d8f 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -63,8 +63,9 @@ public class MinimalWordCountJava8Test implements Serializable {
     p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
 
     p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*"))
-     .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
-         .withOutputType(TypeDescriptors.strings()))
+     .apply(FlatMapElements
+         .into(TypeDescriptors.strings())
+         .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))))
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements

http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index c165f7f..0983165 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.lang.reflect.ParameterizedType;
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -30,32 +33,29 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 public class FlatMapElements<InputT, OutputT>
 extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
   /**
-   * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
-   * returns a {@link PTransform} that applies {@code fn} to every element of the input
-   * {@code PCollection<InputT>} and outputs all of the elements to the output
-   * {@code PCollection<OutputT>}.
-   *
-   * <p>Example of use in Java 8:
-   * <pre>{@code
-   * PCollection<String> words = lines.apply(
-   *     FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
-   *         .withOutputType(new TypeDescriptor<String>(){});
-   * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
-   * descriptor need not be provided.
+   * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
+   * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
+   * if constructed using {@link #via(SimpleFunction)} directly.
+   */
+  @Nullable
+  private final transient TypeDescriptor<Iterable<OutputT>> outputType;
+
+  /**
+   * Non-null on a fully specified transform - is null only when constructed using {@link
+   * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
    */
-  public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
+  @Nullable
+  private final SimpleFunction<InputT, Iterable<OutputT>> fn;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
-    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
-    // covariance and contravariance in Java, so instead we cast it to an invariant
-    // function here.
-    @SuppressWarnings("unchecked") // safe covariant cast
-    SerializableFunction<InputT, Iterable<OutputT>> simplerFn =
-        (SerializableFunction<InputT, Iterable<OutputT>>) fn;
+  private FlatMapElements(
+      @Nullable SimpleFunction<InputT, Iterable<OutputT>> fn,
+      @Nullable TypeDescriptor<Iterable<OutputT>> outputType,
+      @Nullable Class<?> fnClass) {
+    this.fn = fn;
+    this.outputType = outputType;
+    this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
 
-    return new MissingOutputTypeDescriptor<>(simplerFn);
   }
 
   /**
@@ -82,54 +82,45 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
    */
   public static <InputT, OutputT> FlatMapElements<InputT, OutputT>
   via(SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) {
-    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
-    // covariance and contravariance in Java, so instead we cast it to an invariant
-    // function here.
-    @SuppressWarnings("unchecked") // safe covariant cast
-    SimpleFunction<InputT, Iterable<OutputT>> simplerFn =
-        (SimpleFunction<InputT, Iterable<OutputT>>) fn;
-
-    return new FlatMapElements<>(simplerFn, fn.getClass());
+    return new FlatMapElements(fn, null, fn.getClass());
   }
 
   /**
-   * An intermediate builder for a {@link FlatMapElements} transform. To complete the transform,
-   * provide an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
-   * {@link #via(SerializableFunction)} for a full example of use.
+   * Returns a new {@link FlatMapElements} transform with the given type descriptor for the output
+   * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
    */
-  public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
-    private final SerializableFunction<InputT, Iterable<OutputT>> fn;
-
-    private MissingOutputTypeDescriptor(
-        SerializableFunction<InputT, Iterable<OutputT>> fn) {
-      this.fn = fn;
-    }
-
-    public FlatMapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) {
-      TypeDescriptor<Iterable<OutputT>> iterableOutputType = TypeDescriptors.iterables(outputType);
-
-      return new FlatMapElements<>(
-          SimpleFunction.fromSerializableFunctionWithOutputType(fn,
-              iterableOutputType),
-              fn.getClass());
-    }
+  public static <OutputT> FlatMapElements<?, OutputT>
+  into(final TypeDescriptor<OutputT> outputType) {
+    return new FlatMapElements<>(null, TypeDescriptors.iterables(outputType), null);
   }
 
-  //////////////////////////////////////////////////////////////////////////////////////////////////
-
-  private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn;
-  private final DisplayData.ItemSpec<?> fnClassDisplayData;
-
-  private FlatMapElements(
-      SimpleFunction<InputT, ? extends Iterable<OutputT>> fn,
-      Class<?> fnClass) {
-    this.fn = fn;
-    this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function");
+  /**
+   * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
+   * returns a {@link PTransform} that applies {@code fn} to every element of the input
+   * {@code PCollection<InputT>} and outputs all of the elements to the output
+   * {@code PCollection<OutputT>}.
+   *
+   * <p>Example of use in Java 8:
+   * <pre>{@code
+   * PCollection<String> words = lines.apply(
+   *     FlatMapElements.via((String line) -> Arrays.asList(line.split(" ")))
+   *         .withOutputType(new TypeDescriptor<String>(){});
+   * }</pre>
+   *
+   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
+   * descriptor need not be provided.
+   */
+  public <NewInputT> FlatMapElements<NewInputT, OutputT>
+  via(SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) {
+    return new FlatMapElements(
+        SimpleFunction.fromSerializableFunctionWithOutputType(fn, (TypeDescriptor) outputType),
+        null,
+        fn.getClass());
   }
 
   @Override
   public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+    checkNotNull(fn, "Must specify a function on FlatMapElements using .via()");
     return input.apply(
         "FlatMap",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
index 471724d..501b0d1 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -52,8 +52,8 @@ public class FlatMapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(FlatMapElements
             // Note that the input type annotation is required.
-            .via((Integer i) -> ImmutableList.of(i, -i))
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via((Integer i) -> ImmutableList.of(i, -i)));
 
     PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
     pipeline.run();
@@ -69,8 +69,8 @@ public class FlatMapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(FlatMapElements
             // Note that the input type annotation is required.
-            .via(new Negater()::numAndNegation)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via(new Negater()::numAndNegation));
 
     PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2);
     pipeline.run();


[3/3] beam git commit: This closes #2363

Posted by jk...@apache.org.
This closes #2363


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b7ddab28
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b7ddab28
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b7ddab28

Branch: refs/heads/master
Commit: b7ddab2818cebb702ecb61a36fdb6c79c4a4956f
Parents: f314354 831b11f
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Apr 12 11:38:32 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:38:32 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/MinimalWordCountJava8.java    |   9 +-
 .../beam/examples/complete/game/GameStats.java  |   6 +-
 .../beam/examples/complete/game/UserScore.java  |   5 +-
 .../examples/MinimalWordCountJava8Test.java     |   9 +-
 .../complete/game/HourlyTeamScoreTest.java      |   5 +-
 .../examples/complete/game/UserScoreTest.java   |   6 +-
 .../beam/runners/direct/DirectRunnerTest.java   |   5 +-
 .../beam/sdk/transforms/FlatMapElements.java    | 113 +++++++++----------
 .../apache/beam/sdk/transforms/MapElements.java |  99 ++++++++--------
 .../beam/sdk/transforms/MapElementsTest.java    |  25 ++--
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  |   7 +-
 .../transforms/FlatMapElementsJava8Test.java    |  10 +-
 .../sdk/transforms/MapElementsJava8Test.java    |  10 +-
 13 files changed, 153 insertions(+), 156 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Removes MapElements.MissingOutputTypeDescriptor

Posted by jk...@apache.org.
Removes MapElements.MissingOutputTypeDescriptor

This comes from changing MapElements.via(fn).withOutputType(td)
to MapElements.into(td).via(fn) which is also shorter.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf6c6b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf6c6b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf6c6b3

Branch: refs/heads/master
Commit: 3bf6c6b3f81c643a1107346674088f554aca29a8
Parents: f314354
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:42:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:35:05 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/MinimalWordCountJava8.java    |  4 +-
 .../beam/examples/complete/game/GameStats.java  |  6 +-
 .../beam/examples/complete/game/UserScore.java  |  5 +-
 .../examples/MinimalWordCountJava8Test.java     |  4 +-
 .../complete/game/HourlyTeamScoreTest.java      |  5 +-
 .../examples/complete/game/UserScoreTest.java   |  6 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  5 +-
 .../apache/beam/sdk/transforms/MapElements.java | 99 ++++++++++----------
 .../beam/sdk/transforms/MapElementsTest.java    | 25 +++--
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  |  7 +-
 .../sdk/transforms/MapElementsJava8Test.java    | 10 +-
 11 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 738b64d..0072886 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -61,8 +61,8 @@ public class MinimalWordCountJava8 {
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements
-         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
-         .withOutputType(TypeDescriptors.strings()))
+         .into(TypeDescriptors.strings())
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
 
      // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
      .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6874953..9c79fad 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -260,9 +260,9 @@ public class GameStats extends LeaderBoard {
     // Extract username/score pairs from the event stream
     PCollection<KV<String, Integer>> userEvents =
         rawEvents.apply("ExtractUserScore",
-          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
-            .withOutputType(
-                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+          MapElements
+              .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+              .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 
     // Calculate the total score per user over fixed windows, and
     // cumulative updates for late data.

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 7dd5a8e..b4b023f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -165,9 +165,8 @@ public class UserScore {
 
       return gameInfo
         .apply(MapElements
-            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
-            .withOutputType(
-                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
+            .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
         .apply(Sum.<String>integersPerKey());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index c2f3efe..911ccf6 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -68,8 +68,8 @@ public class MinimalWordCountJava8Test implements Serializable {
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements
-         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
-         .withOutputType(TypeDescriptors.strings()))
+         .into(TypeDescriptors.strings())
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
      .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 40bbfdb..409fc92 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -101,9 +101,8 @@ public class HourlyTeamScoreTest implements Serializable {
               -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
       // run a map to access the fields in the result.
       .apply(MapElements
-          .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
-          .withOutputType(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+          .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+          .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 
       PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index f0c28ab..2eb63aa 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -143,9 +143,9 @@ public class UserScoreTest implements Serializable {
     PCollection<KV<String, Integer>> extract = input
       .apply(ParDo.of(new ParseEventFn()))
       .apply(
-          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
-          .withOutputType(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+          MapElements
+              .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+              .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 
     PAssert.that(extract).empty();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 28c24ad..3b81f4d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -192,9 +192,10 @@ public class DirectRunnerTest implements Serializable {
     TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
     };
     PCollection<byte[]> foos =
-        p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+        p.apply(Create.of(1, 1, 1, 2, 2, 3))
+            .apply(MapElements.into(td).via(getBytes));
     PCollection<byte[]> msync =
-        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.into(td).via(getBytes));
     PCollection<byte[]> bytes =
         PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
     PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 421b2ab..82cf753 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -26,34 +29,27 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 public class MapElements<InputT, OutputT>
 extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-
   /**
-   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
-   * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns
-   * a {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in
-   * the input.
-   *
-   * <p>Example of use in Java 8:
-   * <pre>{@code
-   * PCollection<Integer> wordLengths = words.apply(
-   *     MapElements.via((String word) -> word.length())
-   *         .withOutputType(new TypeDescriptor<Integer>() {});
-   * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
-   * descriptor need not be provided.
+   * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
+   * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
+   * if constructed using {@link #via(SimpleFunction)} directly.
    */
-  public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<? super InputT, OutputT> fn) {
+  @Nullable private final transient TypeDescriptor<OutputT> outputType;
 
-    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
-    // covariance and contravariance in Java, so instead we cast it to an invariant
-    // function here.
-    @SuppressWarnings("unchecked") // safe covariant cast
-        SerializableFunction<InputT, OutputT> simplerFn =
-        (SerializableFunction<InputT, OutputT>) fn;
+  /**
+   * Non-null on a fully specified transform - is null only when constructed using {@link
+   * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
+   */
+  @Nullable private final SimpleFunction<InputT, OutputT> fn;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
-    return new MissingOutputTypeDescriptor<>(simplerFn);
+  private MapElements(
+      @Nullable SimpleFunction<InputT, OutputT> fn,
+      @Nullable TypeDescriptor<OutputT> outputType,
+      @Nullable Class<?> fnClass) {
+    this.fn = fn;
+    this.outputType = outputType;
+    this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
   }
 
   /**
@@ -77,41 +73,46 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
    */
   public static <InputT, OutputT> MapElements<InputT, OutputT> via(
       final SimpleFunction<InputT, OutputT> fn) {
-    return new MapElements<>(fn, fn.getClass());
+    return new MapElements<>(fn, null, fn.getClass());
   }
 
   /**
-   * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide
-   * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
-   * {@link #via(SerializableFunction)} for a full example of use.
+   * Returns a new {@link MapElements} transform with the given type descriptor for the output
+   * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
    */
-  public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
-    private final SerializableFunction<InputT, OutputT> fn;
-
-    private MissingOutputTypeDescriptor(SerializableFunction<InputT, OutputT> fn) {
-      this.fn = fn;
-    }
-
-    public MapElements<InputT, OutputT> withOutputType(final TypeDescriptor<OutputT> outputType) {
-      return new MapElements<>(
-          SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType), fn.getClass());
-    }
-
+  public static <OutputT> MapElements<?, OutputT>
+  into(final TypeDescriptor<OutputT> outputType) {
+    return new MapElements<>(null, outputType, null);
   }
 
-  ///////////////////////////////////////////////////////////////////
-
-  private final SimpleFunction<InputT, OutputT> fn;
-  private final DisplayData.ItemSpec<?> fnClassDisplayData;
-
-  private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) {
-    this.fn = fn;
-    this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
+  /**
+   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
+   * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a
+   * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the
+   * input.
+   *
+   * <p>Example of use in Java 8:
+   *
+   * <pre>{@code
+   * PCollection<Integer> wordLengths = words.apply(
+   *     MapElements.via((String word) -> word.length())
+   *         .withOutputType(new TypeDescriptor<Integer>() {});
+   * }</pre>
+   *
+   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
+   * descriptor need not be provided.
+   */
+  public <NewInputT> MapElements<NewInputT, OutputT> via(
+      SerializableFunction<NewInputT, OutputT> fn) {
+    return new MapElements<>(
+        SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType),
+        null,
+        fn.getClass());
   }
 
   @Override
   public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+    checkNotNull(fn, "Must specify a function on MapElements using .via()");
     return input.apply(
         "Map",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 82e856e..7bf94a0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -152,14 +153,18 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMapBasicSerializableFunction() throws Exception {
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(MapElements.via(new SerializableFunction<Integer, Integer>() {
-          @Override
-          public Integer apply(Integer input) {
-            return -input;
-          }
-        }).withOutputType(new TypeDescriptor<Integer>() {}));
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(1, 2, 3))
+            .apply(
+                MapElements.into(TypeDescriptors.integers())
+                    .via(
+                        new SerializableFunction<Integer, Integer>() {
+                          @Override
+                          public Integer apply(Integer input) {
+                            return -input;
+                          }
+                        }));
 
     PAssert.that(output).containsInAnyOrder(-2, -1, -3);
     pipeline.run();
@@ -210,8 +215,8 @@ public class MapElementsTest implements Serializable {
           }
         };
 
-    MapElements<?, ?> serializableMap = MapElements.via(serializableFn)
-        .withOutputType(TypeDescriptor.of(Integer.class));
+    MapElements<?, ?> serializableMap =
+        MapElements.into(TypeDescriptors.integers()).via(serializableFn);
     assertThat(DisplayData.from(serializableMap),
         hasDisplayItem("mapFn", serializableFn.getClass()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
index c323858..160b231 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -103,10 +103,9 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
     if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
       inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
     } else {
-      inputInGlobalWindow = typedInputInGlobalWindow
-          .apply(MapElements.via(write.getFormatFunction())
-              .withOutputType(new TypeDescriptor<TableRow>() {
-              }));
+      inputInGlobalWindow =
+          typedInputInGlobalWindow.apply(
+              MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction()));
     }
 
     // PCollection of filename, file byte size.

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
index e0e9d9b4..dbd5ef3 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,8 +47,8 @@ public class MapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(MapElements
             // Note that the type annotation is required.
-            .via((Integer i) -> i * 2)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via((Integer i) -> i * 2));
 
     PAssert.that(output).containsInAnyOrder(6, 2, 4);
     pipeline.run();
@@ -82,8 +82,8 @@ public class MapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(MapElements
             // Note that the type annotation is required.
-            .via(new Doubler()::doubleIt)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via(new Doubler()::doubleIt));
 
     PAssert.that(output).containsInAnyOrder(6, 2, 4);
     pipeline.run();