You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/13 07:59:42 UTC

[1/2] beam git commit: support topicPartition in BeamKafkaTable

Repository: beam
Updated Branches:
  refs/heads/master acfab8965 -> 2b4a6b5d5


support topicPartition in BeamKafkaTable


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92b3a9ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92b3a9ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92b3a9ac

Branch: refs/heads/master
Commit: 92b3a9acf04450313c3c2340f6921bf433c2dc04
Parents: acfab89
Author: mingmxu <mi...@ebay.com>
Authored: Fri Nov 10 19:51:37 2017 -0800
Committer: James Xu <xu...@gmail.com>
Committed: Mon Nov 13 14:41:29 2017 +0800

----------------------------------------------------------------------
 .../sql/meta/provider/kafka/BeamKafkaTable.java | 39 +++++++++++++++-----
 1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/92b3a9ac/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 50f7496..8f663a3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 
@@ -45,6 +46,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
 public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
   private String bootstrapServers;
   private List<String> topics;
+  private List<TopicPartition> topicPartitions;
   private Map<String, Object> configUpdates;
 
   protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
@@ -58,6 +60,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
     this.topics = topics;
   }
 
+  public BeamKafkaTable(BeamRecordSqlType beamSqlRowType,
+      List<TopicPartition> topicPartitions, String bootstrapServers) {
+    super(beamSqlRowType);
+    this.bootstrapServers = bootstrapServers;
+    this.topicPartitions = topicPartitions;
+  }
+
   public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
     this.configUpdates = configUpdates;
     return this;
@@ -76,15 +85,27 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
 
   @Override
   public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
-    return PBegin.in(pipeline).apply("read",
-            KafkaIO.<byte[], byte[]>read()
-                .withBootstrapServers(bootstrapServers)
-                .withTopics(topics)
-                .updateConsumerProperties(configUpdates)
-                .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
-                .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
-                .withoutMetadata())
-            .apply("in_format", getPTransformForInput());
+    KafkaIO.Read<byte[], byte[]> kafkaRead = null;
+    if (topics != null) {
+      kafkaRead = KafkaIO.<byte[], byte[]>read()
+      .withBootstrapServers(bootstrapServers)
+      .withTopics(topics)
+      .updateConsumerProperties(configUpdates)
+      .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+      .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
+    } else if (topicPartitions != null) {
+      kafkaRead = KafkaIO.<byte[], byte[]>read()
+          .withBootstrapServers(bootstrapServers)
+          .withTopicPartitions(topicPartitions)
+          .updateConsumerProperties(configUpdates)
+          .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+          .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
+    } else {
+      throw new IllegalArgumentException("One of topics and topicPartitions must be configurated.");
+    }
+
+    return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata())
+.apply("in_format", getPTransformForInput());
   }
 
   @Override


[2/2] beam git commit: This closes #4115

Posted by xu...@apache.org.
This closes #4115


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b4a6b5d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b4a6b5d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b4a6b5d

Branch: refs/heads/master
Commit: 2b4a6b5d5cb455fe73971fde4caa7d1446cd55aa
Parents: acfab89 92b3a9a
Author: James Xu <xu...@gmail.com>
Authored: Mon Nov 13 14:42:08 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Mon Nov 13 14:42:08 2017 +0800

----------------------------------------------------------------------
 .../sql/meta/provider/kafka/BeamKafkaTable.java | 39 +++++++++++++++-----
 1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------