You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/04/12 19:57:13 UTC

[42/50] [abbrv] beam git commit: Removes MapElements.MissingOutputTypeDescriptor

Removes MapElements.MissingOutputTypeDescriptor

This comes from changing MapElements.via(fn).withOutputType(td)
to MapElements.into(td).via(fn) which is also shorter.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf6c6b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf6c6b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf6c6b3

Branch: refs/heads/DSL_SQL
Commit: 3bf6c6b3f81c643a1107346674088f554aca29a8
Parents: f314354
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:42:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:35:05 2017 -0700

----------------------------------------------------------------------
 .../beam/examples/MinimalWordCountJava8.java    |  4 +-
 .../beam/examples/complete/game/GameStats.java  |  6 +-
 .../beam/examples/complete/game/UserScore.java  |  5 +-
 .../examples/MinimalWordCountJava8Test.java     |  4 +-
 .../complete/game/HourlyTeamScoreTest.java      |  5 +-
 .../examples/complete/game/UserScoreTest.java   |  6 +-
 .../beam/runners/direct/DirectRunnerTest.java   |  5 +-
 .../apache/beam/sdk/transforms/MapElements.java | 99 ++++++++++----------
 .../beam/sdk/transforms/MapElementsTest.java    | 25 +++--
 .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java  |  7 +-
 .../sdk/transforms/MapElementsJava8Test.java    | 10 +-
 11 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 738b64d..0072886 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -61,8 +61,8 @@ public class MinimalWordCountJava8 {
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements
-         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
-         .withOutputType(TypeDescriptors.strings()))
+         .into(TypeDescriptors.strings())
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
 
      // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
      .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6874953..9c79fad 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -260,9 +260,9 @@ public class GameStats extends LeaderBoard {
     // Extract username/score pairs from the event stream
     PCollection<KV<String, Integer>> userEvents =
         rawEvents.apply("ExtractUserScore",
-          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
-            .withOutputType(
-                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+          MapElements
+              .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+              .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 
     // Calculate the total score per user over fixed windows, and
     // cumulative updates for late data.

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 7dd5a8e..b4b023f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -165,9 +165,8 @@ public class UserScore {
 
       return gameInfo
         .apply(MapElements
-            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
-            .withOutputType(
-                TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
+            .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+            .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
         .apply(Sum.<String>integersPerKey());
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index c2f3efe..911ccf6 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -68,8 +68,8 @@ public class MinimalWordCountJava8Test implements Serializable {
      .apply(Filter.by((String word) -> !word.isEmpty()))
      .apply(Count.<String>perElement())
      .apply(MapElements
-         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
-         .withOutputType(TypeDescriptors.strings()))
+         .into(TypeDescriptors.strings())
+         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
      .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 40bbfdb..409fc92 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -101,9 +101,8 @@ public class HourlyTeamScoreTest implements Serializable {
               -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
       // run a map to access the fields in the result.
       .apply(MapElements
-          .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
-          .withOutputType(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+          .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+          .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 
       PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index f0c28ab..2eb63aa 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -143,9 +143,9 @@ public class UserScoreTest implements Serializable {
     PCollection<KV<String, Integer>> extract = input
       .apply(ParDo.of(new ParseEventFn()))
       .apply(
-          MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
-          .withOutputType(
-              TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+          MapElements
+              .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+              .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
 
     PAssert.that(extract).empty();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 28c24ad..3b81f4d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -192,9 +192,10 @@ public class DirectRunnerTest implements Serializable {
     TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
     };
     PCollection<byte[]> foos =
-        p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+        p.apply(Create.of(1, 1, 1, 2, 2, 3))
+            .apply(MapElements.into(td).via(getBytes));
     PCollection<byte[]> msync =
-        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.into(td).via(getBytes));
     PCollection<byte[]> bytes =
         PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
     PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 421b2ab..82cf753 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.sdk.transforms;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
@@ -26,34 +29,27 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  */
 public class MapElements<InputT, OutputT>
 extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-
   /**
-   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
-   * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns
-   * a {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in
-   * the input.
-   *
-   * <p>Example of use in Java 8:
-   * <pre>{@code
-   * PCollection<Integer> wordLengths = words.apply(
-   *     MapElements.via((String word) -> word.length())
-   *         .withOutputType(new TypeDescriptor<Integer>() {});
-   * }</pre>
-   *
-   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
-   * descriptor need not be provided.
+   * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
+   * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
+   * if constructed using {@link #via(SimpleFunction)} directly.
    */
-  public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<? super InputT, OutputT> fn) {
+  @Nullable private final transient TypeDescriptor<OutputT> outputType;
 
-    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
-    // covariance and contravariance in Java, so instead we cast it to an invariant
-    // function here.
-    @SuppressWarnings("unchecked") // safe covariant cast
-        SerializableFunction<InputT, OutputT> simplerFn =
-        (SerializableFunction<InputT, OutputT>) fn;
+  /**
+   * Non-null on a fully specified transform - is null only when constructed using {@link
+   * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
+   */
+  @Nullable private final SimpleFunction<InputT, OutputT> fn;
+  private final DisplayData.ItemSpec<?> fnClassDisplayData;
 
-    return new MissingOutputTypeDescriptor<>(simplerFn);
+  private MapElements(
+      @Nullable SimpleFunction<InputT, OutputT> fn,
+      @Nullable TypeDescriptor<OutputT> outputType,
+      @Nullable Class<?> fnClass) {
+    this.fn = fn;
+    this.outputType = outputType;
+    this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
   }
 
   /**
@@ -77,41 +73,46 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
    */
   public static <InputT, OutputT> MapElements<InputT, OutputT> via(
       final SimpleFunction<InputT, OutputT> fn) {
-    return new MapElements<>(fn, fn.getClass());
+    return new MapElements<>(fn, null, fn.getClass());
   }
 
   /**
-   * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide
-   * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
-   * {@link #via(SerializableFunction)} for a full example of use.
+   * Returns a new {@link MapElements} transform with the given type descriptor for the output
+   * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
    */
-  public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
-    private final SerializableFunction<InputT, OutputT> fn;
-
-    private MissingOutputTypeDescriptor(SerializableFunction<InputT, OutputT> fn) {
-      this.fn = fn;
-    }
-
-    public MapElements<InputT, OutputT> withOutputType(final TypeDescriptor<OutputT> outputType) {
-      return new MapElements<>(
-          SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType), fn.getClass());
-    }
-
+  public static <OutputT> MapElements<?, OutputT>
+  into(final TypeDescriptor<OutputT> outputType) {
+    return new MapElements<>(null, outputType, null);
   }
 
-  ///////////////////////////////////////////////////////////////////
-
-  private final SimpleFunction<InputT, OutputT> fn;
-  private final DisplayData.ItemSpec<?> fnClassDisplayData;
-
-  private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) {
-    this.fn = fn;
-    this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
+  /**
+   * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
+   * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a
+   * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the
+   * input.
+   *
+   * <p>Example of use in Java 8:
+   *
+   * <pre>{@code
+   * PCollection<Integer> wordLengths = words.apply(
+   *     MapElements.via((String word) -> word.length())
+   *         .withOutputType(new TypeDescriptor<Integer>() {});
+   * }</pre>
+   *
+   * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
+   * descriptor need not be provided.
+   */
+  public <NewInputT> MapElements<NewInputT, OutputT> via(
+      SerializableFunction<NewInputT, OutputT> fn) {
+    return new MapElements<>(
+        SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType),
+        null,
+        fn.getClass());
   }
 
   @Override
   public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+    checkNotNull(fn, "Must specify a function on MapElements using .via()");
     return input.apply(
         "Map",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 82e856e..7bf94a0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -152,14 +153,18 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testMapBasicSerializableFunction() throws Exception {
-    PCollection<Integer> output = pipeline
-        .apply(Create.of(1, 2, 3))
-        .apply(MapElements.via(new SerializableFunction<Integer, Integer>() {
-          @Override
-          public Integer apply(Integer input) {
-            return -input;
-          }
-        }).withOutputType(new TypeDescriptor<Integer>() {}));
+    PCollection<Integer> output =
+        pipeline
+            .apply(Create.of(1, 2, 3))
+            .apply(
+                MapElements.into(TypeDescriptors.integers())
+                    .via(
+                        new SerializableFunction<Integer, Integer>() {
+                          @Override
+                          public Integer apply(Integer input) {
+                            return -input;
+                          }
+                        }));
 
     PAssert.that(output).containsInAnyOrder(-2, -1, -3);
     pipeline.run();
@@ -210,8 +215,8 @@ public class MapElementsTest implements Serializable {
           }
         };
 
-    MapElements<?, ?> serializableMap = MapElements.via(serializableFn)
-        .withOutputType(TypeDescriptor.of(Integer.class));
+    MapElements<?, ?> serializableMap =
+        MapElements.into(TypeDescriptors.integers()).via(serializableFn);
     assertThat(DisplayData.from(serializableMap),
         hasDisplayItem("mapFn", serializableFn.getClass()));
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
index c323858..160b231 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -103,10 +103,9 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
     if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
       inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
     } else {
-      inputInGlobalWindow = typedInputInGlobalWindow
-          .apply(MapElements.via(write.getFormatFunction())
-              .withOutputType(new TypeDescriptor<TableRow>() {
-              }));
+      inputInGlobalWindow =
+          typedInputInGlobalWindow.apply(
+              MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction()));
     }
 
     // PCollection of filename, file byte size.

http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
index e0e9d9b4..dbd5ef3 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,8 +47,8 @@ public class MapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(MapElements
             // Note that the type annotation is required.
-            .via((Integer i) -> i * 2)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via((Integer i) -> i * 2));
 
     PAssert.that(output).containsInAnyOrder(6, 2, 4);
     pipeline.run();
@@ -82,8 +82,8 @@ public class MapElementsJava8Test implements Serializable {
         .apply(Create.of(1, 2, 3))
         .apply(MapElements
             // Note that the type annotation is required.
-            .via(new Doubler()::doubleIt)
-            .withOutputType(new TypeDescriptor<Integer>() {}));
+            .into(TypeDescriptors.integers())
+            .via(new Doubler()::doubleIt));
 
     PAssert.that(output).containsInAnyOrder(6, 2, 4);
     pipeline.run();