You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/30 16:59:31 UTC
[2/4] beam git commit: [BEAM-2114] Throw instead of warning when
KafkaIO cannot infer coder
[BEAM-2114] Throw instead of warning when KafkaIO cannot infer coder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10fc5f86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10fc5f86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10fc5f86
Branch: refs/heads/master
Commit: 10fc5f86fac066423c77d9b6d9e7ed87ab32ef01
Parents: 10b3e3e
Author: peay <pe...@protonmail.com>
Authored: Sat Apr 29 11:31:15 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:42:52 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +++++++++-----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 +++++++++++++++++---
2 files changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b3591ce..8f94b8a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -291,17 +291,23 @@ public class KafkaIO {
if (parameterizedType.getRawType() == Deserializer.class) {
Type parameter = parameterizedType.getActualTypeArguments()[0];
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) parameter;
+
try {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) parameter;
return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
} catch (CannotProvideCoderException e) {
- LOG.warn("Could not infer coder from deserializer type", e);
+ throw new RuntimeException(
+ String.format("Unable to automatically infer a Coder for "
+ + "the Kafka Deserializer %s: no coder registered for type %s",
+ deserializer, clazz));
}
}
}
- throw new RuntimeException("Could not extract deserializer type from " + deserializer);
+ throw new RuntimeException(
+ String.format("Could not extract the Kafaka Deserializer type from %s",
+ deserializer));
}
/**
@@ -634,14 +640,14 @@ public class KafkaIO {
Coder<K> keyCoder =
checkNotNull(
getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
- "Key coder must be set");
+ "Key coder must be inferable from input or set using readWithCoders");
Coder<V> valueCoder =
checkNotNull(
getValueCoder() != null
? getValueCoder()
: inferCoder(registry, getValueDeserializer()),
- "Value coder must be set");
+ "Value coder must be inferable from input or set using readWithCoders");
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index a9c318d..2f895fe 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -63,7 +63,6 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -605,7 +604,6 @@ public class KafkaIOTest {
}
@Test
- @Category(NeedsRunner.class)
public void testUnboundedSourceMetrics() {
int numElements = 1000;
@@ -917,7 +915,24 @@ public class KafkaIOTest {
@Override
public void close() {
+ }
+ }
+
+ // class for testing coder inference
+ private static class ObjectDeserializer
+ implements Deserializer<Object> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ }
+ @Override
+ public Object deserialize(String topic, byte[] bytes) {
+ return new Object();
+ }
+
+ @Override
+ public void close() {
}
}
@@ -938,8 +953,13 @@ public class KafkaIOTest {
instanceof VarLongCoder);
}
+ @Test(expected = RuntimeException.class)
+ public void testInferKeyCoderFailure() {
+ CoderRegistry registry = CoderRegistry.createDefault();
+ KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+ }
+
@Test
- @Category(NeedsRunner.class)
public void testSinkMetrics() throws Exception {
// Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.