You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/30 16:59:30 UTC

[1/4] beam git commit: [BEAM-2114] Fixed display data for Kafka read/write with coders

Repository: beam
Updated Branches:
  refs/heads/master 202aae9d3 -> 3d47b335c


[BEAM-2114] Fixed display data for Kafka read/write with coders


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10b3e3e7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10b3e3e7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10b3e3e7

Branch: refs/heads/master
Commit: 10b3e3e7391603e00a64933fe74b7748b58bc590
Parents: 202aae9
Author: peay <pe...@protonmail.com>
Authored: Sat Apr 29 11:08:21 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:39:45 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |   9 +-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 102 +++++++++++++++++++
 2 files changed, 109 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 000df70..b3591ce 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -688,9 +688,11 @@ public class KafkaIO {
      */
     private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
         ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead",
         // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
         //     lets allow these, applications can have better resume point for restarts.
+        CoderBasedKafkaDeserializer.configForKeyDeserializer(), "Use readWithCoders instead",
+        CoderBasedKafkaDeserializer.configForValueDeserializer(), "Use readWithCoders instead"
         );
 
     // set config defaults
@@ -1526,7 +1528,10 @@ public class KafkaIO {
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
         ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
-        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead"
+        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead",
+
+        CoderBasedKafkaSerializer.configForKeySerializer(), "Use writeWithCoders instead",
+        CoderBasedKafkaSerializer.configForValueSerializer(), "Use writeWithCoders instead"
      );
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/10b3e3e7/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index feb65da..a9c318d 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -266,6 +266,34 @@ public class KafkaIOTest {
     }
   }
 
+  /**
+   * Creates a consumer with two topics, with 5 partitions each.
+   * numElements are (round-robin) assigned all the 10 partitions.
+   */
+  private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
+          int numElements,
+          @Nullable SerializableFunction<KV<Integer, Long>, Instant> timestampFn) {
+
+    List<String> topics = ImmutableList.of("topic_a", "topic_b");
+
+    KafkaIO.Read<Integer, Long> reader = KafkaIO
+            .<Integer, Long>readWithCoders(VarIntCoder.of(), VarLongCoder.of())
+            .withBootstrapServers("myServer1:9092,myServer2:9092")
+            .withTopics(topics)
+            .withConsumerFactoryFn(new ConsumerFactoryFn(
+                    topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
+            .withKeyDeserializer(IntegerDeserializer.class)
+            .withValueDeserializer(LongDeserializer.class)
+            .withMaxNumRecords(numElements);
+
+    if (timestampFn != null) {
+      return reader.withTimestampFn(timestampFn);
+    } else {
+      return reader;
+    }
+  }
+
+
   private static class AssertMultipleOf implements SerializableFunction<Iterable<Long>, Void> {
     private final int num;
 
@@ -316,6 +344,19 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnboundedSourceWithCoders() {
+    int numElements = 1000;
+
+    PCollection<Long> input = p
+            .apply(mkKafkaReadTransformWithCoders(numElements, new ValueAsTimestampFn())
+                    .withoutMetadata())
+            .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
   public void testUnboundedSourceWithSingleTopic() {
     // same as testUnboundedSource, but with single topic
 
@@ -667,6 +708,39 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testSinkWithCoders() throws Exception {
+    // Simply read from kafka source and write to kafka sink. Then verify the records
+    // are correctly published to mock kafka producer.
+
+    int numElements = 1000;
+
+    synchronized (MOCK_PRODUCER_LOCK) {
+
+      MOCK_PRODUCER.clear();
+
+      ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start();
+
+      String topic = "test";
+
+      p
+              .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+                      .withoutMetadata())
+              .apply(KafkaIO.<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of())
+                      .withBootstrapServers("none")
+                      .withTopic(topic)
+                      .withKeySerializer(IntegerSerializer.class)
+                      .withValueSerializer(LongSerializer.class)
+                      .withProducerFactoryFn(new ProducerFactoryFn()));
+
+      p.run();
+
+      completionThread.shutdown();
+
+      verifyProducerRecords(topic, numElements, false);
+    }
+  }
+
+  @Test
   public void testValuesSink() throws Exception {
     // similar to testSink(), but use values()' interface.
 
@@ -757,6 +831,19 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testSourceDisplayDataWithCoders() {
+    KafkaIO.Read<Integer, Long> read = mkKafkaReadTransformWithCoders(10, null);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("topics", "topic_a,topic_b"));
+    assertThat(displayData, hasDisplayItem("enable.auto.commit", false));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServer1:9092,myServer2:9092"));
+    assertThat(displayData, hasDisplayItem("auto.offset.reset", "latest"));
+    assertThat(displayData, hasDisplayItem("receive.buffer.bytes", 524288));
+  }
+
+  @Test
   public void testSourceWithExplicitPartitionsDisplayData() {
     KafkaIO.Read<byte[], Long> read = KafkaIO.<byte[], Long>read()
         .withBootstrapServers("myServer1:9092,myServer2:9092")
@@ -790,6 +877,21 @@ public class KafkaIOTest {
     assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
     assertThat(displayData, hasDisplayItem("retries", 3));
   }
+  @Test
+  public void testSinkDisplayDataWithCoders() {
+    KafkaIO.Write<Integer, Long> write = KafkaIO
+            .<Integer, Long>writeWithCoders(VarIntCoder.of(), VarLongCoder.of())
+            .withBootstrapServers("myServerA:9092,myServerB:9092")
+            .withTopic("myTopic")
+            .withValueSerializer(LongSerializer.class)
+            .withProducerFactoryFn(new ProducerFactoryFn());
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("topic", "myTopic"));
+    assertThat(displayData, hasDisplayItem("bootstrap.servers", "myServerA:9092,myServerB:9092"));
+    assertThat(displayData, hasDisplayItem("retries", 3));
+  }
 
   // interface for testing coder inference
   private interface DummyInterface<T> {


[2/4] beam git commit: [BEAM-2114] Throw instead of warning when KafkaIO cannot infer coder

Posted by jk...@apache.org.
[BEAM-2114] Throw instead of warning when KafkaIO cannot infer coder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/10fc5f86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/10fc5f86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/10fc5f86

Branch: refs/heads/master
Commit: 10fc5f86fac066423c77d9b6d9e7ed87ab32ef01
Parents: 10b3e3e
Author: peay <pe...@protonmail.com>
Authored: Sat Apr 29 11:31:15 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:42:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 18 +++++++++-----
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 26 +++++++++++++++++---
 2 files changed, 35 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index b3591ce..8f94b8a 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -291,17 +291,23 @@ public class KafkaIO {
       if (parameterizedType.getRawType() == Deserializer.class) {
         Type parameter = parameterizedType.getActualTypeArguments()[0];
 
+        @SuppressWarnings("unchecked")
+        Class<T> clazz = (Class<T>) parameter;
+
         try {
-          @SuppressWarnings("unchecked")
-          Class<T> clazz = (Class<T>) parameter;
           return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
         } catch (CannotProvideCoderException e) {
-          LOG.warn("Could not infer coder from deserializer type", e);
+          throw new RuntimeException(
+                  String.format("Unable to automatically infer a Coder for "
+                                + "the Kafka Deserializer %s: no coder registered for type %s",
+                                deserializer, clazz));
         }
       }
     }
 
-    throw new RuntimeException("Could not extract deserializer type from " + deserializer);
+    throw new RuntimeException(
+            String.format("Could not extract the Kafaka Deserializer type from %s",
+                          deserializer));
   }
 
   /**
@@ -634,14 +640,14 @@ public class KafkaIO {
       Coder<K> keyCoder =
           checkNotNull(
               getKeyCoder() != null ? getKeyCoder() : inferCoder(registry, getKeyDeserializer()),
-              "Key coder must be set");
+              "Key coder must be inferable from input or set using readWithCoders");
 
       Coder<V> valueCoder =
           checkNotNull(
               getValueCoder() != null
                   ? getValueCoder()
                   : inferCoder(registry, getValueDeserializer()),
-              "Value coder must be set");
+              "Value coder must be inferable from input or set using readWithCoders");
 
       // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =

http://git-wip-us.apache.org/repos/asf/beam/blob/10fc5f86/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index a9c318d..2f895fe 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -63,7 +63,6 @@ import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.metrics.SinkMetrics;
 import org.apache.beam.sdk.metrics.SourceMetrics;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
@@ -605,7 +604,6 @@ public class KafkaIOTest {
   }
 
   @Test
-  @Category(NeedsRunner.class)
   public void testUnboundedSourceMetrics() {
     int numElements = 1000;
 
@@ -917,7 +915,24 @@ public class KafkaIOTest {
 
     @Override
     public void close() {
+    }
+  }
+
+  // class for testing coder inference
+  private static class ObjectDeserializer
+          implements Deserializer<Object> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
 
+    @Override
+    public Object deserialize(String topic, byte[] bytes) {
+      return new Object();
+    }
+
+    @Override
+    public void close() {
     }
   }
 
@@ -938,8 +953,13 @@ public class KafkaIOTest {
             instanceof VarLongCoder);
   }
 
+  @Test(expected = RuntimeException.class)
+  public void testInferKeyCoderFailure() {
+    CoderRegistry registry = CoderRegistry.createDefault();
+    KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+  }
+
   @Test
-  @Category(NeedsRunner.class)
   public void testSinkMetrics() throws Exception {
     // Simply read from kafka source and write to kafka sink. Then verify the metrics are reported.
 


[3/4] beam git commit: [BEAM-2114] Tests for KafkaIO: use ExpectedException rule

Posted by jk...@apache.org.
[BEAM-2114] Tests for KafkaIO: use ExpectedException rule


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/34e00465
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/34e00465
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/34e00465

Branch: refs/heads/master
Commit: 34e0046512282872f205166162b16b616f834e93
Parents: 10fc5f8
Author: peay <pe...@protonmail.com>
Authored: Sun Apr 30 18:22:38 2017 +0200
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:45:30 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 33 +++++++++++++-------
 1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/34e00465/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2f895fe..591c099 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -102,7 +102,6 @@ import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -240,8 +239,8 @@ public class KafkaIOTest {
   }
 
   /**
-   * Creates a consumer with two topics, with 5 partitions each.
-   * numElements are (round-robin) assigned all the 10 partitions.
+   * Creates a consumer with two topics, with 10 partitions each.
+   * numElements are (round-robin) assigned all the 20 partitions.
    */
   private static KafkaIO.Read<Integer, Long> mkKafkaReadTransform(
       int numElements,
@@ -266,8 +265,9 @@ public class KafkaIOTest {
   }
 
   /**
-   * Creates a consumer with two topics, with 5 partitions each.
-   * numElements are (round-robin) assigned all the 10 partitions.
+   * Creates a consumer with two topics, with 10 partitions each.
+   * numElements are (round-robin) assigned all the 20 partitions.
+   * Coders are specified explicitly.
    */
   private static KafkaIO.Read<Integer, Long> mkKafkaReadTransformWithCoders(
           int numElements,
@@ -918,17 +918,22 @@ public class KafkaIOTest {
     }
   }
 
+  // class for which a coder cannot be infered
+  private static class NonInferableObject {
+
+  }
+
   // class for testing coder inference
-  private static class ObjectDeserializer
-          implements Deserializer<Object> {
+  private static class NonInferableObjectDeserializer
+          implements Deserializer<NonInferableObject> {
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
     }
 
     @Override
-    public Object deserialize(String topic, byte[] bytes) {
-      return new Object();
+    public NonInferableObject deserialize(String topic, byte[] bytes) {
+      return new NonInferableObject();
     }
 
     @Override
@@ -953,10 +958,14 @@ public class KafkaIOTest {
             instanceof VarLongCoder);
   }
 
-  @Test(expected = RuntimeException.class)
-  public void testInferKeyCoderFailure() {
+  @Rule public ExpectedException cannotInferException = ExpectedException.none();
+
+  @Test
+  public void testInferKeyCoderFailure() throws Exception {
+    cannotInferException.expect(RuntimeException.class);
+
     CoderRegistry registry = CoderRegistry.createDefault();
-    KafkaIO.inferCoder(registry, ObjectDeserializer.class);
+    KafkaIO.inferCoder(registry, NonInferableObjectDeserializer.class);
   }
 
   @Test


[4/4] beam git commit: This closes #2780

Posted by jk...@apache.org.
This closes #2780


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3d47b335
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3d47b335
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3d47b335

Branch: refs/heads/master
Commit: 3d47b335cbe22f92fc83e2ba2a7c35847bcadca3
Parents: 202aae9 34e0046
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sun Apr 30 09:59:10 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Sun Apr 30 09:59:10 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   |  27 ++--
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 143 ++++++++++++++++++-
 2 files changed, 156 insertions(+), 14 deletions(-)
----------------------------------------------------------------------