You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/12 18:41:46 UTC
[2/3] beam git commit: Removes MapElements.MissingOutputTypeDescriptor
Removes MapElements.MissingOutputTypeDescriptor
This comes from changing MapElements.via(fn).withOutputType(td)
to MapElements.into(td).via(fn) which is also shorter.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf6c6b3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf6c6b3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf6c6b3
Branch: refs/heads/master
Commit: 3bf6c6b3f81c643a1107346674088f554aca29a8
Parents: f314354
Author: Eugene Kirpichov <ki...@google.com>
Authored: Wed Mar 29 13:42:29 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Wed Apr 12 11:35:05 2017 -0700
----------------------------------------------------------------------
.../beam/examples/MinimalWordCountJava8.java | 4 +-
.../beam/examples/complete/game/GameStats.java | 6 +-
.../beam/examples/complete/game/UserScore.java | 5 +-
.../examples/MinimalWordCountJava8Test.java | 4 +-
.../complete/game/HourlyTeamScoreTest.java | 5 +-
.../examples/complete/game/UserScoreTest.java | 6 +-
.../beam/runners/direct/DirectRunnerTest.java | 5 +-
.../apache/beam/sdk/transforms/MapElements.java | 99 ++++++++++----------
.../beam/sdk/transforms/MapElementsTest.java | 25 +++--
.../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 7 +-
.../sdk/transforms/MapElementsJava8Test.java | 10 +-
11 files changed, 90 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 738b64d..0072886 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -61,8 +61,8 @@ public class MinimalWordCountJava8 {
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
- .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
- .withOutputType(TypeDescriptors.strings()))
+ .into(TypeDescriptors.strings())
+ .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index 6874953..9c79fad 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -260,9 +260,9 @@ public class GameStats extends LeaderBoard {
// Extract username/score pairs from the event stream
PCollection<KV<String, Integer>> userEvents =
rawEvents.apply("ExtractUserScore",
- MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
- .withOutputType(
- TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+ MapElements
+ .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+ .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
// Calculate the total score per user over fixed windows, and
// cumulative updates for late data.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 7dd5a8e..b4b023f 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -165,9 +165,8 @@ public class UserScore {
return gameInfo
.apply(MapElements
- .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
- .withOutputType(
- TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())))
+ .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+ .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())))
.apply(Sum.<String>integersPerKey());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index c2f3efe..911ccf6 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -68,8 +68,8 @@ public class MinimalWordCountJava8Test implements Serializable {
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
- .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
- .withOutputType(TypeDescriptors.strings()))
+ .into(TypeDescriptors.strings())
+ .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
index 40bbfdb..409fc92 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java
@@ -101,9 +101,8 @@ public class HourlyTeamScoreTest implements Serializable {
-> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
// run a map to access the fields in the result.
.apply(MapElements
- .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
- .withOutputType(
- TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+ .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+ .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS);
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index f0c28ab..2eb63aa 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -143,9 +143,9 @@ public class UserScoreTest implements Serializable {
PCollection<KV<String, Integer>> extract = input
.apply(ParDo.of(new ParseEventFn()))
.apply(
- MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))
- .withOutputType(
- TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())));
+ MapElements
+ .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))
+ .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())));
PAssert.that(extract).empty();
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 28c24ad..3b81f4d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -192,9 +192,10 @@ public class DirectRunnerTest implements Serializable {
TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
};
PCollection<byte[]> foos =
- p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+ p.apply(Create.of(1, 1, 1, 2, 2, 3))
+ .apply(MapElements.into(td).via(getBytes));
PCollection<byte[]> msync =
- p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+ p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.into(td).via(getBytes));
PCollection<byte[]> bytes =
PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 421b2ab..82cf753 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.sdk.transforms;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
@@ -26,34 +29,27 @@ import org.apache.beam.sdk.values.TypeDescriptor;
*/
public class MapElements<InputT, OutputT>
extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-
/**
- * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
- * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns
- * a {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in
- * the input.
- *
- * <p>Example of use in Java 8:
- * <pre>{@code
- * PCollection<Integer> wordLengths = words.apply(
- * MapElements.via((String word) -> word.length())
- * .withOutputType(new TypeDescriptor<Integer>() {});
- * }</pre>
- *
- * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
- * descriptor need not be provided.
+ * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the
+ * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null
+ * if constructed using {@link #via(SimpleFunction)} directly.
*/
- public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
- via(SerializableFunction<? super InputT, OutputT> fn) {
+ @Nullable private final transient TypeDescriptor<OutputT> outputType;
- // TypeDescriptor interacts poorly with the wildcards needed to correctly express
- // covariance and contravariance in Java, so instead we cast it to an invariant
- // function here.
- @SuppressWarnings("unchecked") // safe covariant cast
- SerializableFunction<InputT, OutputT> simplerFn =
- (SerializableFunction<InputT, OutputT>) fn;
+ /**
+ * Non-null on a fully specified transform - is null only when constructed using {@link
+ * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}.
+ */
+ @Nullable private final SimpleFunction<InputT, OutputT> fn;
+ private final DisplayData.ItemSpec<?> fnClassDisplayData;
- return new MissingOutputTypeDescriptor<>(simplerFn);
+ private MapElements(
+ @Nullable SimpleFunction<InputT, OutputT> fn,
+ @Nullable TypeDescriptor<OutputT> outputType,
+ @Nullable Class<?> fnClass) {
+ this.fn = fn;
+ this.outputType = outputType;
+ this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
}
/**
@@ -77,41 +73,46 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
*/
public static <InputT, OutputT> MapElements<InputT, OutputT> via(
final SimpleFunction<InputT, OutputT> fn) {
- return new MapElements<>(fn, fn.getClass());
+ return new MapElements<>(fn, null, fn.getClass());
}
/**
- * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide
- * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See
- * {@link #via(SerializableFunction)} for a full example of use.
+ * Returns a new {@link MapElements} transform with the given type descriptor for the output
+ * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}.
*/
- public static final class MissingOutputTypeDescriptor<InputT, OutputT> {
-
- private final SerializableFunction<InputT, OutputT> fn;
-
- private MissingOutputTypeDescriptor(SerializableFunction<InputT, OutputT> fn) {
- this.fn = fn;
- }
-
- public MapElements<InputT, OutputT> withOutputType(final TypeDescriptor<OutputT> outputType) {
- return new MapElements<>(
- SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType), fn.getClass());
- }
-
+ public static <OutputT> MapElements<?, OutputT>
+ into(final TypeDescriptor<OutputT> outputType) {
+ return new MapElements<>(null, outputType, null);
}
- ///////////////////////////////////////////////////////////////////
-
- private final SimpleFunction<InputT, OutputT> fn;
- private final DisplayData.ItemSpec<?> fnClassDisplayData;
-
- private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) {
- this.fn = fn;
- this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function");
+ /**
+ * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
+ * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a
+ * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the
+ * input.
+ *
+ * <p>Example of use in Java 8:
+ *
+ * <pre>{@code
+ * PCollection<Integer> wordLengths = words.apply(
+ * MapElements.via((String word) -> word.length())
+ * .withOutputType(new TypeDescriptor<Integer>() {});
+ * }</pre>
+ *
+ * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type
+ * descriptor need not be provided.
+ */
+ public <NewInputT> MapElements<NewInputT, OutputT> via(
+ SerializableFunction<NewInputT, OutputT> fn) {
+ return new MapElements<>(
+ SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType),
+ null,
+ fn.getClass());
}
@Override
public PCollection<OutputT> expand(PCollection<? extends InputT> input) {
+ checkNotNull(fn, "Must specify a function on MapElements using .via()");
return input.apply(
"Map",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index 82e856e..7bf94a0 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -152,14 +153,18 @@ public class MapElementsTest implements Serializable {
@Test
@Category(NeedsRunner.class)
public void testMapBasicSerializableFunction() throws Exception {
- PCollection<Integer> output = pipeline
- .apply(Create.of(1, 2, 3))
- .apply(MapElements.via(new SerializableFunction<Integer, Integer>() {
- @Override
- public Integer apply(Integer input) {
- return -input;
- }
- }).withOutputType(new TypeDescriptor<Integer>() {}));
+ PCollection<Integer> output =
+ pipeline
+ .apply(Create.of(1, 2, 3))
+ .apply(
+ MapElements.into(TypeDescriptors.integers())
+ .via(
+ new SerializableFunction<Integer, Integer>() {
+ @Override
+ public Integer apply(Integer input) {
+ return -input;
+ }
+ }));
PAssert.that(output).containsInAnyOrder(-2, -1, -3);
pipeline.run();
@@ -210,8 +215,8 @@ public class MapElementsTest implements Serializable {
}
};
- MapElements<?, ?> serializableMap = MapElements.via(serializableFn)
- .withOutputType(TypeDescriptor.of(Integer.class));
+ MapElements<?, ?> serializableMap =
+ MapElements.into(TypeDescriptors.integers()).via(serializableFn);
assertThat(DisplayData.from(serializableMap),
hasDisplayItem("mapFn", serializableFn.getClass()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
index c323858..160b231 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java
@@ -103,10 +103,9 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> {
if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) {
inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow;
} else {
- inputInGlobalWindow = typedInputInGlobalWindow
- .apply(MapElements.via(write.getFormatFunction())
- .withOutputType(new TypeDescriptor<TableRow>() {
- }));
+ inputInGlobalWindow =
+ typedInputInGlobalWindow.apply(
+ MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction()));
}
// PCollection of filename, file byte size.
http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
index e0e9d9b4..dbd5ef3 100644
--- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
+++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -47,8 +47,8 @@ public class MapElementsJava8Test implements Serializable {
.apply(Create.of(1, 2, 3))
.apply(MapElements
// Note that the type annotation is required.
- .via((Integer i) -> i * 2)
- .withOutputType(new TypeDescriptor<Integer>() {}));
+ .into(TypeDescriptors.integers())
+ .via((Integer i) -> i * 2));
PAssert.that(output).containsInAnyOrder(6, 2, 4);
pipeline.run();
@@ -82,8 +82,8 @@ public class MapElementsJava8Test implements Serializable {
.apply(Create.of(1, 2, 3))
.apply(MapElements
// Note that the type annotation is required.
- .via(new Doubler()::doubleIt)
- .withOutputType(new TypeDescriptor<Integer>() {}));
+ .into(TypeDescriptors.integers())
+ .via(new Doubler()::doubleIt));
PAssert.that(output).containsInAnyOrder(6, 2, 4);
pipeline.run();