You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by xu...@apache.org on 2017/11/13 07:59:42 UTC
[1/2] beam git commit: support topicPartition in BeamKafkaTable
Repository: beam
Updated Branches:
refs/heads/master acfab8965 -> 2b4a6b5d5
support topicPartition in BeamKafkaTable
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/92b3a9ac
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/92b3a9ac
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/92b3a9ac
Branch: refs/heads/master
Commit: 92b3a9acf04450313c3c2340f6921bf433c2dc04
Parents: acfab89
Author: mingmxu <mi...@ebay.com>
Authored: Fri Nov 10 19:51:37 2017 -0800
Committer: James Xu <xu...@gmail.com>
Committed: Mon Nov 13 14:41:29 2017 +0800
----------------------------------------------------------------------
.../sql/meta/provider/kafka/BeamKafkaTable.java | 39 +++++++++++++++-----
1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/92b3a9ac/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
index 50f7496..8f663a3 100644
--- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
+++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/kafka/BeamKafkaTable.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -45,6 +46,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer;
public abstract class BeamKafkaTable extends BaseBeamTable implements Serializable {
private String bootstrapServers;
private List<String> topics;
+ private List<TopicPartition> topicPartitions;
private Map<String, Object> configUpdates;
protected BeamKafkaTable(BeamRecordSqlType beamSqlRowType) {
@@ -58,6 +60,13 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
this.topics = topics;
}
+ public BeamKafkaTable(BeamRecordSqlType beamSqlRowType,
+ List<TopicPartition> topicPartitions, String bootstrapServers) {
+ super(beamSqlRowType);
+ this.bootstrapServers = bootstrapServers;
+ this.topicPartitions = topicPartitions;
+ }
+
public BeamKafkaTable updateConsumerProperties(Map<String, Object> configUpdates) {
this.configUpdates = configUpdates;
return this;
@@ -76,15 +85,27 @@ public abstract class BeamKafkaTable extends BaseBeamTable implements Serializab
@Override
public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
- return PBegin.in(pipeline).apply("read",
- KafkaIO.<byte[], byte[]>read()
- .withBootstrapServers(bootstrapServers)
- .withTopics(topics)
- .updateConsumerProperties(configUpdates)
- .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
- .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
- .withoutMetadata())
- .apply("in_format", getPTransformForInput());
+ KafkaIO.Read<byte[], byte[]> kafkaRead = null;
+ if (topics != null) {
+ kafkaRead = KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(bootstrapServers)
+ .withTopics(topics)
+ .updateConsumerProperties(configUpdates)
+ .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
+ } else if (topicPartitions != null) {
+ kafkaRead = KafkaIO.<byte[], byte[]>read()
+ .withBootstrapServers(bootstrapServers)
+ .withTopicPartitions(topicPartitions)
+ .updateConsumerProperties(configUpdates)
+ .withKeyDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of())
+ .withValueDeserializerAndCoder(ByteArrayDeserializer.class, ByteArrayCoder.of());
+ } else {
+ throw new IllegalArgumentException("One of topics and topicPartitions must be configurated.");
+ }
+
+ return PBegin.in(pipeline).apply("read", kafkaRead.withoutMetadata())
+.apply("in_format", getPTransformForInput());
}
@Override
[2/2] beam git commit: This closes #4115
Posted by xu...@apache.org.
This closes #4115
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b4a6b5d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b4a6b5d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b4a6b5d
Branch: refs/heads/master
Commit: 2b4a6b5d5cb455fe73971fde4caa7d1446cd55aa
Parents: acfab89 92b3a9a
Author: James Xu <xu...@gmail.com>
Authored: Mon Nov 13 14:42:08 2017 +0800
Committer: James Xu <xu...@gmail.com>
Committed: Mon Nov 13 14:42:08 2017 +0800
----------------------------------------------------------------------
.../sql/meta/provider/kafka/BeamKafkaTable.java | 39 +++++++++++++++-----
1 file changed, 30 insertions(+), 9 deletions(-)
----------------------------------------------------------------------