You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/30 16:59:30 UTC
[1/4] beam git commit: [BEAM-2114] Fixed display data for Kafka
read/write with coders
Repository: beam
Updated Branches:
refs/heads/master 202aae9d3 -> 3d47b335c
[BEAM-2114] Fixed display data for Kafka read/write with coders
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10b3e3e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10b3e3e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10b3e3e7
Branch: refs/heads/master
Commit: 10b3e3e7391603e00a64933fe74b7748b58bc590
Parents: 202aae9
Author: peay <pe...@protonmail.com>
Authored: Sat Apr 29 11:08:21 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:39:45 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 9 +-
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 102 +++++++++++++++++++
2 files changed, 109 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 000df70..b3591ce 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -688,9 +688,11 @@ public class KafkaIO {
*/
private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
- ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead",
// "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
// lets allow these, applications can have better resume point for restarts.
+ CoderBasedKafkaDeserializer.configForKeyDeserializer(), "Use readWithCoders instead",
+ CoderBasedKafkaDeserializer.configForValueDeserializer(), "Use readWithCoders instead"
);
// set config defaults
@@ -1526,7 +1528,10 @@ public class KafkaIO {
*/
private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
- ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead"
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead",
+
+ CoderBasedKafkaSerializer.configForKeySerializer(), "Use writeWithCoders instead",
+ CoderBasedKafkaSerializer.configForValueSerializer(), "Use writeWithCoders instead"
);
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index feb65da..a9c318d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -266,6 +266,34 @@ public class KafkaIOTest {
}
}
+ /**
+ * Creates a consumer with two topics, with 5 partitions each.
+ * numElements are (round-robin) assigned all the 10 partitions.
+ */
+ private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
+ int numElements,
+ @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
+
+ List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+ KafkaIO.Read<Integer, Long> reader = KafkaIO
+ .<Integer, Long>readWithCoders(VarIntCoder.of(), VarLongCoder.of())
+ .withBootstrapServers("myServer1:9092,myServer2:9092")
+ .withTopics(topics)
+ .withConsumerFactoryFn(new ConsumerFactoryFn(
+ topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
+ .withKeyDeserializer(IntegerDeserializer.class)
+ .withValueDeserializer(LongDeserializer.class)
+ .withMaxNumRecords(numElements);
+
+ if (timestampFn != null) {
+ return reader.withTimestampFn(timestampFn);
+ } else {
+ return reader;
+ }
+ }
+
+
private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
private final int num;
@@ -316,6 +344,19 @@ public class KafkaIOTest {
}
@Test
+ public void testUnboundedSourceWithCoders() {
+ int numElements = 1000;
+
+ PCollection<Long> input = p
+ .apply(mkKafkaReadTransformWithCoders(numElements, new ValueAsTimestampFn())
+ .withoutMetadata())
+ .apply(Values.<Long>create());
+
+ addCountingAsserts(input, numElements);
+ p.run();
+ }
+
+ @Test
public void testUnboundedSourceWithSingleTopic() {
// same as testUnboundedSource, but with single topic
@@ -667,6 +708,39 @@ public class KafkaIOTest {
}
@Test
+ public void testSinkWithCoders() throws Exception {
+ // Simply read from kafka source and write to kafka sink. Then verify the records
+ // are correctly published to mock kafka producer.
+
+ int numElements = 1000;
+
+ synchronized (MOCK_PRODUCER_LOCK) {
+
+ MOCK_PRODUCER.clear();
+
+ ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+
+ String topic = "test";
+
+ p
+ .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+ .withoutMetadata())
+ .apply(KafkaIO.<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of())
+ .withBootstrapServers("none")
+ .withTopic(topic)
+ .withKeySerializer(IntegerSerializer.class)
+ .withValueSerializer(LongSerializer.class)
+ .withProducerFactoryFn(new ProducerFactoryFn()));
+
+ p.run();
+
+ completionThread.shutdown();
+
+ verifyProducerRecords(topic, numElements, false);
+ }
+ }
+
+ @Test
public void testValuesSink() throws Exception {
// similar to testSink(), but use values()' interface.
@@ -757,6 +831,19 @@ public class KafkaIOTest {
}
@Test
+ public void testSourceDisplayDataWithCoders() {
+ KafkaIO.Read<Integer, Long> read = mkKafkaReadTransformWithCoders(10, null);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b"));
+ assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+ assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
+ assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+ assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+ }
+
+ @Test
public void testSourceWithExplicitPartitionsDisplayData() {
KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
.withBootstrapServers("myServer1:9092,myServer2:9092")
@@ -790,6 +877,21 @@ public class KafkaIOTest {
assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
assertThat(displayData, hasDisplayItem("retries", 3));
}
+ @Test
+ public void testSinkDisplayDataWithCoders() {
+ KafkaIO.Write<Integer, Long> write = KafkaIO
+ .<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of())
+ .withBootstrapServers("myServerA:9092,myServerB:9092")
+ .withTopic("myTopic")
+ .withValueSerializer(LongSerializer.class)
+ .withProducerFactoryFn(new ProducerFactoryFn());
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+ assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
+ assertThat(displayData, hasDisplayItem("retries", 3));
+ }
// interface for testing coder inference
private interface DummyInterface<T> {
[2/4] beam git commit: [BEAM-2114] Throw instead of warning when
KafkaIO cannot infer coder
Posted by jk...@apache.org.
[BEAM-2114] Throw instead of warning when KafkaIO cannot infer coder
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10fc5f86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10fc5f86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10fc5f86
Branch: refs/heads/master
Commit: 10fc5f86fac066423c77d9b6d9e7ed87ab32ef01
Parents: 10b3e3e
Author: peay <pe...@protonmail.com>
Authored: Sat Apr 29 11:31:15 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:42:52 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 18 +++++++++-----
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 26 +++++++++++++++++---
2 files changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b3591ce..8f94b8a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -291,17 +291,23 @@ public class KafkaIO {
if (parameterizedType.getRawType() == Deserializer.class) {
Type parameter = parameterizedType.getActualTypeArguments()[0];
+ @SuppressWarnings("unchecked")
+ Class<T> clazz = (Class<T>) parameter;
+
try {
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>) parameter;
return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
} catch (CannotProvideCoderException e) {
- LOG.warn("Could not infer coder from deserializer type", e);
+ throw new RuntimeException(
+ String.format("Unable to automatically infer a Coder for "
+ + "the Kafka Deserializer %s: no coder registered for type %s",
+ deserializer, clazz));
}
}
}
- throw new RuntimeException("Could not extract deserializer type from " + deserializer);
+ throw new RuntimeException(
+ String.format("Could not extract the Kafaka Deserializer type from %s",
+ deserializer));
}
/**
@@ -634,14 +640,14 @@ public class KafkaIO {
Coder<K> keyCoder =
checkNotNull(
getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
- "Key coder must be set");
+ "Key coder must be inferable from input or set using readWithCoders");
Coder<V> valueCoder =
checkNotNull(
getValueCoder() != null
? getValueCoder()
: inferCoder(registry, getValueDeserializer()),
- "Value coder must be set");
+ "Value coder must be inferable from input or set using readWithCoders");
// Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
Unbounded<KafkaRecord<K, V>> unbounded =
http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index a9c318d..2f895fe 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -63,7 +63,6 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SinkMetrics;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -605,7 +604,6 @@ public class KafkaIOTest {
}
@Test
- @Category(NeedsRunner.class)
public void testUnboundedSourceMetrics() {
int numElements = 1000;
@@ -917,7 +915,24 @@ public class KafkaIOTest {
@Override
public void close() {
+ }
+ }
+
+ // class for testing coder inference
+ private static class ObjectDeserializer
+ implements Deserializer<Object> {
+
+ @Override
+ public void configure(Map<String, ?> configs, boolean isKey) {
+ }
+ @Override
+ public Object deserialize(String topic, byte[] bytes) {
+ return new Object();
+ }
+
+ @Override
+ public void close() {
}
}
@@ -938,8 +953,13 @@ public class KafkaIOTest {
instanceof VarLongCoder);
}
+ @Test(expected = RuntimeException.class)
+ public void testInferKeyCoderFailure() {
+ CoderRegistry registry = CoderRegistry.createDefault();
+ KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+ }
+
@Test
- @Category(NeedsRunner.class)
public void testSinkMetrics() throws Exception {
// Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.
[3/4] beam git commit: [BEAM-2114] Tests for KafkaIO: use
ExpectedException rule
Posted by jk...@apache.org.
[BEAM-2114] Tests for KafkaIO: use ExpectedException rule
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34e00465
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34e00465
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34e00465
Branch: refs/heads/master
Commit: 34e0046512282872f205166162b16b616f834e93
Parents: 10fc5f8
Author: peay <pe...@protonmail.com>
Authored: Sun Apr 30 18:22:38 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:45:30 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 33 +++++++++++++-------
1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/34e00465/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2f895fe..591c099 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -102,7 +102,6 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -240,8 +239,8 @@ public class KafkaIOTest {
}
/**
- * Creates a consumer with two topics, with 5 partitions each.
- * numElements are (round-robin) assigned all the 10 partitions.
+ * Creates a consumer with two topics, with 10 partitions each.
+ * numElements are (round-robin) assigned all the 20 partitions.
*/
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
int numElements,
@@ -266,8 +265,9 @@ public class KafkaIOTest {
}
/**
- * Creates a consumer with two topics, with 5 partitions each.
- * numElements are (round-robin) assigned all the 10 partitions.
+ * Creates a consumer with two topics, with 10 partitions each.
+ * numElements are (round-robin) assigned all the 20 partitions.
+ * Coders are specified explicitly.
*/
private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
int numElements,
@@ -918,17 +918,22 @@ public class KafkaIOTest {
}
}
+ // class for which a coder cannot be infered
+ private static class NonInferableObject {
+
+ }
+
// class for testing coder inference
- private static class ObjectDeserializer
- implements Deserializer<Object> {
+ private static class NonInferableObjectDeserializer
+ implements Deserializer<NonInferableObject> {
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
}
@Override
- public Object deserialize(String topic, byte[] bytes) {
- return new Object();
+ public NonInferableObject deserialize(String topic, byte[] bytes) {
+ return new NonInferableObject();
}
@Override
@@ -953,10 +958,14 @@ public class KafkaIOTest {
instanceof VarLongCoder);
}
- @Test(expected = RuntimeException.class)
- public void testInferKeyCoderFailure() {
+ @Rule public ExpectedException cannotInferException = ExpectedException.none();
+
+ @Test
+ public void testInferKeyCoderFailure() throws Exception {
+ cannotInferException.expect(RuntimeException.class);
+
CoderRegistry registry = CoderRegistry.createDefault();
- KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+ KafkaIO.inferCoder(registry, NonInferableObjectDeserializer.class);
}
@Test
[4/4] beam git commit: This closes #2780
Posted by jk...@apache.org.
This closes #2780
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d47b335
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d47b335
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d47b335
Branch: refs/heads/master
Commit: 3d47b335cbe22f92fc83e2ba2a7c35847bcadca3
Parents: 202aae9 34e0046
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sun Apr 30 09:59:10 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:59:10 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/kafka/KafkaIO.java | 27 ++--
.../apache/beam/sdk/io/kafka/KafkaIOTest.java | 143 ++++++++++++++++++-
2 files changed, 156 insertions(+), 14 deletions(-)
----------------------------------------------------------------------