You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bc...@apache.org on 2016/08/17 23:22:49 UTC
[1/2] incubator-beam git commit: Closes #756
Repository: incubator-beam
Updated Branches:
refs/heads/master 89367cfb1 -> d93ef2edd
Closes #756
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d93ef2ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d93ef2ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d93ef2ed
Branch: refs/heads/master
Commit: d93ef2edd260a2077bc2ba6abd1ca02abd147a9a
Parents: 89367cf 236945d
Author: bchambers <bc...@google.com>
Authored: Wed Aug 17 16:09:01 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 16:09:01 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 28 ++++++++++----------
.../org/apache/beam/sdk/transforms/Count.java | 8 +++---
.../beam/sdk/transforms/FlatMapElements.java | 4 +--
.../org/apache/beam/sdk/transforms/Flatten.java | 12 ++++-----
.../org/apache/beam/sdk/transforms/Keys.java | 8 +++---
.../org/apache/beam/sdk/transforms/KvSwap.java | 9 +++----
.../apache/beam/sdk/transforms/MapElements.java | 16 ++++++++---
.../beam/sdk/transforms/RemoveDuplicates.java | 8 +++---
.../org/apache/beam/sdk/transforms/Values.java | 8 +++---
.../apache/beam/sdk/transforms/WithKeys.java | 9 +++----
.../beam/sdk/transforms/windowing/Window.java | 11 ++++----
.../java/org/apache/beam/sdk/PipelineTest.java | 12 ++++-----
.../java/org/apache/beam/sdk/io/WriteTest.java | 4 ++-
.../beam/sdk/transforms/MapElementsTest.java | 8 +++---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++++---
15 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
[2/2] incubator-beam git commit: Replace ParDo with simpler
transforms where possible
Posted by bc...@apache.org.
Replace ParDo with simpler transforms where possible
There are a number of places in the Java SDK where we use
ParDo.of(DoFn) when MapElements or other higher-level
composites are applicable and readable. This change
alters a number of those.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/236945d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/236945d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/236945d2
Branch: refs/heads/master
Commit: 236945d2504b73de91f7292219e0b15a53e062f5
Parents: 89367cf
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 27 14:23:15 2016 -0700
Committer: bchambers <bc...@google.com>
Committed: Wed Aug 17 16:09:01 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/Combine.java | 28 ++++++++++----------
.../org/apache/beam/sdk/transforms/Count.java | 8 +++---
.../beam/sdk/transforms/FlatMapElements.java | 4 +--
.../org/apache/beam/sdk/transforms/Flatten.java | 12 ++++-----
.../org/apache/beam/sdk/transforms/Keys.java | 8 +++---
.../org/apache/beam/sdk/transforms/KvSwap.java | 9 +++----
.../apache/beam/sdk/transforms/MapElements.java | 16 ++++++++---
.../beam/sdk/transforms/RemoveDuplicates.java | 8 +++---
.../org/apache/beam/sdk/transforms/Values.java | 8 +++---
.../apache/beam/sdk/transforms/WithKeys.java | 9 +++----
.../beam/sdk/transforms/windowing/Window.java | 11 ++++----
.../java/org/apache/beam/sdk/PipelineTest.java | 12 ++++-----
.../java/org/apache/beam/sdk/io/WriteTest.java | 4 ++-
.../beam/sdk/transforms/MapElementsTest.java | 8 +++---
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 10 ++++---
15 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6ba3f8a..56c0bc4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2121,14 +2121,14 @@ public class Combine {
inputCoder.getValueCoder()))
.setWindowingStrategyInternal(preCombineStrategy)
.apply("PreCombineHot", Combine.perKey(hotPreCombine))
- .apply("StripNonce", ParDo.of(
- new DoFn<KV<KV<K, Integer>, AccumT>,
- KV<K, InputOrAccum<InputT, AccumT>>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(
- c.element().getKey().getKey(),
- InputOrAccum.<InputT, AccumT>accum(c.element().getValue())));
+ .apply("StripNonce", MapElements.via(
+ new SimpleFunction<KV<KV<K, Integer>, AccumT>,
+ KV<K, InputOrAccum<InputT, AccumT>>>() {
+ @Override
+ public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K, Integer>, AccumT> elem) {
+ return KV.of(
+ elem.getKey().getKey(),
+ InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
}
}))
.setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
@@ -2137,12 +2137,12 @@ public class Combine {
PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split
.get(cold)
.setCoder(inputCoder)
- .apply("PrepareCold", ParDo.of(
- new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element().getKey(),
- InputOrAccum.<InputT, AccumT>input(c.element().getValue())));
+ .apply("PrepareCold", MapElements.via(
+ new SimpleFunction<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() {
+ @Override
+ public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<K, InputT> element) {
+ return KV.of(element.getKey(),
+ InputOrAccum.<InputT, AccumT>input(element.getValue()));
}
}))
.setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index ac59c76..195c5d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,10 +107,10 @@ public class Count {
public PCollection<KV<T, Long>> apply(PCollection<T> input) {
return
input
- .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element(), (Void) null));
+ .apply("Init", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
+ @Override
+ public KV<T, Void> apply(T element) {
+ return KV.of(element, (Void) null);
}
}))
.apply(Count.<T, Void>perKey());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 6f9e3d8..2837c40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -29,7 +29,7 @@ import java.lang.reflect.ParameterizedType;
* {@link PCollection} and merging the results.
*/
public class FlatMapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
/**
* For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn},
* returns a {@link PTransform} that applies {@code fn} to every element of the input
@@ -130,7 +130,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
}
@Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
+ public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
return input.apply(
"FlatMap",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 7e09d7e..f3f4f88 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -173,13 +173,11 @@ public class Flatten {
@SuppressWarnings("unchecked")
Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
- return in.apply("FlattenIterables", ParDo.of(
- new DoFn<Iterable<T>, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- for (T i : c.element()) {
- c.output(i);
- }
+ return in.apply("FlattenIterables", FlatMapElements.via(
+ new SimpleFunction<Iterable<T>, Iterable<T>>() {
+ @Override
+ public Iterable<T> apply(Iterable<T> element) {
+ return element;
}
}))
.setCoder(elemCoder);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index 5ac1866..2405adf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,10 +58,10 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
@Override
public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
return
- in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().getKey());
+ in.apply("Keys", MapElements.via(new SimpleFunction<KV<K, ?>, K>() {
+ @Override
+ public K apply(KV<K, ?> kv) {
+ return kv.getKey();
}
}));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index d4386d2..2b81ebf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,11 +62,10 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
@Override
public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
return
- in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- KV<K, V> e = c.element();
- c.output(KV.of(e.getValue(), e.getKey()));
+ in.apply("KvSwap", MapElements.via(new SimpleFunction<KV<K, V>, KV<V, K>>() {
+ @Override
+ public KV<V, K> apply(KV<K, V> kv) {
+ return KV.of(kv.getValue(), kv.getKey());
}
}));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 17ad6e7..73e4359 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
* {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}.
*/
public class MapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
/**
* For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor,
@@ -44,8 +44,16 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
* descriptor need not be provided.
*/
public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
- via(SerializableFunction<InputT, OutputT> fn) {
- return new MissingOutputTypeDescriptor<>(fn);
+ via(SerializableFunction<? super InputT, OutputT> fn) {
+
+ // TypeDescriptor interacts poorly with the wildcards needed to correctly express
+ // covariance and contravariance in Java, so instead we cast it to an invariant
+ // function here.
+ @SuppressWarnings("unchecked") // safe covariant cast
+ SerializableFunction<InputT, OutputT> simplerFn =
+ (SerializableFunction<InputT, OutputT>) fn;
+
+ return new MissingOutputTypeDescriptor<>(simplerFn);
}
/**
@@ -103,7 +111,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
}
@Override
- public PCollection<OutputT> apply(PCollection<InputT> input) {
+ public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
return input.apply(
"Map",
ParDo.of(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index bba4b51..2744b14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,10 +85,10 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
@Override
public PCollection<T> apply(PCollection<T> in) {
return in
- .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(c.element(), (Void) null));
+ .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>() {
+ @Override
+ public KV<T, Void> apply(T element) {
+ return KV.of(element, (Void) null);
}
}))
.apply(Combine.<T, Void>perKey(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 34342db..d21d100 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,10 +58,10 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
@Override
public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
return
- in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element().getValue());
+ in.apply("Values", MapElements.via(new SimpleFunction<KV<?, V>, V>() {
+ @Override
+ public V apply(KV<?, V> kv) {
+ return kv.getValue();
}
}));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 2a44963..8b061f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,11 +113,10 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
@Override
public PCollection<KV<K, V>> apply(PCollection<V> in) {
PCollection<KV<K, V>> result =
- in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(KV.of(fn.apply(c.element()),
- c.element()));
+ in.apply("AddKeys", MapElements.via(new SimpleFunction<V, KV<K, V>>() {
+ @Override
+ public KV<K, V> apply(V element) {
+ return KV.of(fn.apply(element), element);
}
}));
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index c1b0237..9dd069c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -21,10 +21,10 @@ import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
@@ -645,10 +645,9 @@ public class Window {
// We first apply a (trivial) transform to the input PCollection to produce a new
// PCollection. This ensures that we don't modify the windowing strategy of the input
// which may be used elsewhere.
- .apply("Identity", ParDo.of(new DoFn<T, T>() {
- @ProcessElement
- public void processElement(ProcessContext c) {
- c.output(c.element());
+ .apply("Identity", MapElements.via(new SimpleFunction<T, T>() {
+ @Override public T apply(T element) {
+ return element;
}
}))
// Then we modify the windowing strategy.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 8b86499..d7b3ac5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,10 +36,10 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
@@ -146,10 +146,10 @@ public class PipelineTest {
private static PTransform<PCollection<? extends String>, PCollection<String>> addSuffix(
final String suffix) {
- return ParDo.of(new DoFn<String, String>() {
- @ProcessElement
- public void processElement(DoFn<String, String>.ProcessContext c) {
- c.output(c.element() + suffix);
+ return MapElements.via(new SimpleFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input + suffix;
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index f9bf472..b9ba53b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -91,7 +91,9 @@ public class WriteTest {
// Static counts of the number of records per shard.
private static List<Integer> recordsPerShard = new ArrayList<>();
- private static final MapElements<String, String> IDENTITY_MAP =
+ @SuppressWarnings("unchecked") // covariant cast
+ private static final PTransform<PCollection<String>, PCollection<String>> IDENTITY_MAP =
+ (PTransform)
MapElements.via(new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index e86a128..7217bca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -233,7 +233,7 @@ public class MapElementsTest implements Serializable {
}
@Test
public void testSimpleFunctionDisplayData() {
- SimpleFunction<?, ?> simpleFn = new SimpleFunction<Integer, Integer>() {
+ SimpleFunction<Integer, ?> simpleFn = new SimpleFunction<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input;
@@ -255,17 +255,17 @@ public class MapElementsTest implements Serializable {
@Test
@Category(RunnableOnService.class)
public void testPrimitiveDisplayData() {
- SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() {
+ SimpleFunction<Integer, ?> mapFn = new SimpleFunction<Integer, Integer>() {
@Override
public Integer apply(Integer input) {
return input;
}
};
- MapElements<?, ?> map = MapElements.via(mapFn);
+ MapElements<Integer, ?> map = MapElements.via(mapFn);
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map);
+ Set<DisplayData> displayData = evaluator.<Integer>displayDataForPrimitiveTransforms(map);
assertThat("MapElements should include the mapFn in its primitive display data",
displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2383105..8a0c788 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -34,9 +34,11 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
import org.apache.beam.sdk.values.KV;
@@ -1314,10 +1316,10 @@ public class KafkaIO {
public PDone apply(PCollection<V> input) {
return input
.apply("Kafka values with default key",
- ParDo.of(new DoFn<V, KV<Void, V>>() {
- @ProcessElement
- public void processElement(ProcessContext ctx) throws Exception {
- ctx.output(KV.<Void, V>of(null, ctx.element()));
+ MapElements.via(new SimpleFunction<V, KV<Void, V>>() {
+ @Override
+ public KV<Void, V> apply(V element) {
+ return KV.<Void, V>of(null, element);
}
}))
.setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder))