You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/01 00:05:59 UTC

[1/2] beam git commit: KafkaIO : Add withTopic() api that takes single topic.

Repository: beam
Updated Branches:
  refs/heads/master 0b8932fd3 -> 5f72b83c0


KafkaIO : Add withTopic() api that takes single topic.

Remove need for setting key coder for Writer while writing
values only. If we didn't specifiy the key coder, validation
succeeded but it failed a check while instantiating Kafka producer.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/37b0d45c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/37b0d45c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/37b0d45c

Branch: refs/heads/master
Commit: 37b0d45c76b5fb03cdf5749dee52483fa3811d5b
Parents: 0b8932f
Author: Raghu Angadi <ra...@google.com>
Authored: Wed Mar 29 08:17:25 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Mar 31 16:51:20 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 11 ++++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 24 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/37b0d45c/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index bb7d971..80b40be 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -124,7 +124,7 @@ import org.slf4j.LoggerFactory;
  *  pipeline
  *    .apply(KafkaIO.<Long, String>read()
  *       .withBootstrapServers("broker_1:9092,broker_2:9092")
- *       .withTopics(ImmutableList.of("topic_a", "topic_b"))
+ *       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
  *       // set a Coder for Key and Value
  *       .withKeyCoder(BigEndianLongCoder.of())
  *       .withValueCoder(StringUtf8Coder.of())
@@ -308,6 +308,15 @@ public class KafkaIO {
     }
 
     /**
+     * Returns a new {@link Read} that reads from the topic.
+     * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description
+     * of how the partitions are distributed among the splits.
+     */
+    public Read<K, V> withTopic(String topic) {
+      return withTopics(ImmutableList.of(topic));
+    }
+
+    /**
      * Returns a new {@link Read} that reads from the topics. All the partitions from each
      * of the topics are read.
      * See {@link UnboundedKafkaSource#generateInitialSplits(int, PipelineOptions)} for description

http://git-wip-us.apache.org/repos/asf/beam/blob/37b0d45c/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index d1696d0..7e77512 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -295,6 +295,30 @@ public class KafkaIOTest {
   }
 
   @Test
+  public void testUnboundedSourceWithSingleTopic() {
+    // same as testUnboundedSource, but with single topic
+
+    int numElements = 1000;
+    String topic = "my_topic";
+
+    KafkaIO.Read<Integer, Long> reader = KafkaIO.<Integer, Long>read()
+        .withBootstrapServers("none")
+        .withTopic("my_topic")
+        .withConsumerFactoryFn(new ConsumerFactoryFn(
+            ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
+        .withKeyCoder(BigEndianIntegerCoder.of())
+        .withValueCoder(BigEndianLongCoder.of())
+        .withMaxNumRecords(numElements);
+
+    PCollection<Long> input = p
+        .apply(reader.withoutMetadata())
+        .apply(Values.<Long>create());
+
+    addCountingAsserts(input, numElements);
+    p.run();
+  }
+
+  @Test
   @Category(NeedsRunner.class)
   public void testUnboundedSourceWithExplicitPartitions() {
     int numElements = 1000;


[2/2] beam git commit: This closes #2364

Posted by jk...@apache.org.
This closes #2364


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5f72b83c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5f72b83c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5f72b83c

Branch: refs/heads/master
Commit: 5f72b83c054845ecfc0be03e27acfc24a676a936
Parents: 0b8932f 37b0d45
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Mar 31 16:51:38 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Mar 31 16:51:38 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 11 ++++++++-
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 24 ++++++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------