You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/10 18:08:32 UTC

[pulsar] branch master updated: Issue #2757: Kafka source connector should just transfer bytes (#2761)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e9b6c2  Issue #2757: Kafka source connector should just transfer bytes (#2761)
6e9b6c2 is described below

commit 6e9b6c2a8ffbeae1edabf88486f8432f23c29d74
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 10 11:08:27 2018 -0700

    Issue #2757: Kafka source connector should just transfer bytes (#2761)
    
    *Motivation*
    
    connector is mostly used for transferring bytes from kafka to pulsar.
    since we haven't mapped schema between kafka and pulsar, we should just
    transfer bytes for now.
    
    *Changes*
    
    Add a bytes source and make bytes source as a default setting.
    Keep string source there for BC consideration.
---
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  6 ++-
 .../apache/pulsar/io/kafka/KafkaBytesSource.java   | 48 ++++++++++++++++++++++
 .../resources/META-INF/services/pulsar-io.yaml     |  2 +-
 3 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 494d91b..b6c6840 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -78,6 +78,10 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
 
     }
 
+    protected Properties beforeCreateConsumer(Properties props) {
+        return props;
+    }
+
     @Override
     public void close() throws InterruptedException {
         LOG.info("Stopping kafka source");
@@ -96,7 +100,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
     public void start() {
         runnerThread = new Thread(() -> {
             LOG.info("Starting kafka source");
-            consumer = new KafkaConsumer<>(props);
+            consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
             consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
             ConsumerRecords<String, byte[]> consumerRecords;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
new file mode 100644
index 0000000..1e99208
--- /dev/null
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka;
+
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+/**
+ * Simple Kafka Source that just transfers the value part of the kafka records
+ * as Strings
+ */
+@Slf4j
+public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
+
+    @Override
+    protected Properties beforeCreateConsumer(Properties props) {
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        log.info("Created kafka consumer config : {}", props);
+        return props;
+    }
+
+    @Override
+    public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
+        return record.value();
+    }
+}
diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index 7afc154..c3fd86d 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,5 +19,5 @@
 
 name: kafka
 description: Kafka source and sink connector
-sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource
+sourceClass: org.apache.pulsar.io.kafka.KafkaBytesSource
 sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink