You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/28 21:14:12 UTC
[1/2] beam git commit: [BEAM-2114] Wrap inferred KafkaIO Coders with
NullableCoder
Repository: beam
Updated Branches:
refs/heads/master dbd44faf3 -> a3e7383cc
[BEAM-2114] Wrap inferred KafkaIO Coders with NullableCoder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b16e29cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b16e29cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b16e29cb
Branch: refs/heads/master
Commit: b16e29cbb7edbc2e06dee1ec8512625731980584
Parents: dbd44fa
Author: Devon Meunier <de...@shopify.com>
Authored: Fri Apr 28 15:24:36 2017 -0400
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 28 14:14:04 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 5 +++--
.../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 12 ++++++------
2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/b16e29cb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index a0977b7..47d8281 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -61,6 +61,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
@@ -270,7 +271,7 @@ public class KafkaIO {
* deserializer argument using the {@link Coder} registry.
*/
@VisibleForTesting
- static <T> Coder<T> inferCoder(
+ static <T> NullableCoder<T> inferCoder(
CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
checkNotNull(deserializer);
@@ -289,7 +290,7 @@ public class KafkaIO {
try {
@SuppressWarnings("unchecked")
Class<T> clazz = (Class<T>) parameter;
- return coderRegistry.getDefaultCoder(clazz);
+ return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
} catch (CannotProvideCoderException e) {
LOG.warn("Could not infer coder from deserializer type", e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/b16e29cb/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index e6ed2f7..d713d90 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -183,7 +183,7 @@ public class KafkaIOTest {
// our responsibility to make sure currently enqueued records sync with partition offsets.
// The following task will be called inside each invocation to MockConsumer.poll().
// We enqueue only the records with the offset >= partition's current position.
- Runnable recordEnquerTask = new Runnable() {
+ Runnable recordEnqueueTask = new Runnable() {
@Override
public void run() {
// add all the records with offset >= current partition position.
@@ -199,7 +199,7 @@ public class KafkaIOTest {
}
};
- consumer.schedulePollTask(recordEnquerTask);
+ consumer.schedulePollTask(recordEnqueueTask);
return consumer;
}
@@ -739,16 +739,16 @@ public class KafkaIOTest {
public void testInferKeyCoder() {
CoderRegistry registry = CoderRegistry.createDefault();
- assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class)
+ assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder()
instanceof VarLongCoder);
- assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class)
+ assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder()
instanceof StringUtf8Coder);
- assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class)
+ assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder()
instanceof InstantCoder);
- assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class)
+ assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder()
instanceof VarLongCoder);
}
[2/2] beam git commit: This closes #2767
Posted by dh...@apache.org.
This closes #2767
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a3e7383c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a3e7383c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a3e7383c
Branch: refs/heads/master
Commit: a3e7383ccae77705259f9830613e78c449fed092
Parents: dbd44fa b16e29c
Author: Dan Halperin <dh...@google.com>
Authored: Fri Apr 28 14:14:07 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Apr 28 14:14:07 2017 -0700
----------------------------------------------------------------------
.../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 5 +++--
.../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java | 12 ++++++------
2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------