You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/10/10 18:08:32 UTC
[pulsar] branch master updated: Issue #2757: Kafka source connector
should just transfer bytes (#2761)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6e9b6c2 Issue #2757: Kafka source connector should just transfer bytes (#2761)
6e9b6c2 is described below
commit 6e9b6c2a8ffbeae1edabf88486f8432f23c29d74
Author: Sijie Guo <gu...@gmail.com>
AuthorDate: Wed Oct 10 11:08:27 2018 -0700
Issue #2757: Kafka source connector should just transfer bytes (#2761)
*Motivation*
connector is mostly used for transferring bytes from kafka to pulsar.
since we haven't mapped schema between kafka and pulsar, we should just
transfer bytes for now.
*Changes*
Add a bytes source and make bytes source as a default setting.
Keep string source there for BC consideration.
---
.../pulsar/io/kafka/KafkaAbstractSource.java | 6 ++-
.../apache/pulsar/io/kafka/KafkaBytesSource.java | 48 ++++++++++++++++++++++
.../resources/META-INF/services/pulsar-io.yaml | 2 +-
3 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 494d91b..b6c6840 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -78,6 +78,10 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
}
+ protected Properties beforeCreateConsumer(Properties props) {
+ return props;
+ }
+
@Override
public void close() throws InterruptedException {
LOG.info("Stopping kafka source");
@@ -96,7 +100,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> {
public void start() {
runnerThread = new Thread(() -> {
LOG.info("Starting kafka source");
- consumer = new KafkaConsumer<>(props);
+ consumer = new KafkaConsumer<>(beforeCreateConsumer(props));
consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
ConsumerRecords<String, byte[]> consumerRecords;
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
new file mode 100644
index 0000000..1e99208
--- /dev/null
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSource.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.kafka;
+
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+
+/**
+ * Simple Kafka Source that just transfers the value part of the kafka records
+ * as Strings
+ */
+@Slf4j
+public class KafkaBytesSource extends KafkaAbstractSource<byte[]> {
+
+ @Override
+ protected Properties beforeCreateConsumer(Properties props) {
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ log.info("Created kafka consumer config : {}", props);
+ return props;
+ }
+
+ @Override
+ public byte[] extractValue(ConsumerRecord<String, byte[]> record) {
+ return record.value();
+ }
+}
diff --git a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
index 7afc154..c3fd86d 100644
--- a/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/kafka/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,5 +19,5 @@
name: kafka
description: Kafka source and sink connector
-sourceClass: org.apache.pulsar.io.kafka.KafkaStringSource
+sourceClass: org.apache.pulsar.io.kafka.KafkaBytesSource
sinkClass: org.apache.pulsar.io.kafka.KafkaBytesSink