You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/24 16:33:28 UTC

[incubator-pulsar] branch master updated: Make the KakfaSource abstract by allowing users to implement method to extract value (#1842)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e249493  Make the KakfaSource abstract by allowing users to implement method to extract value (#1842)
e249493 is described below

commit e249493b7e3a5a5cae4e5a779b9499caccd6324a
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu May 24 09:33:22 2018 -0700

    Make the KakfaSource abstract by allowing users to implement method to extract value (#1842)
---
 .../org/apache/pulsar/io/kafka/KafkaSource.java     | 21 +++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
index 6618c1c..fa73418 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
@@ -39,11 +39,11 @@ import java.util.concurrent.ExecutionException;
 /**
  * Simple Kafka Source to transfer messages from a Kafka topic
  */
-public class KafkaSource<V> extends PushSource<V> {
+public abstract class KafkaSource<V> extends PushSource<V> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
 
-    private Consumer<String, V> consumer;
+    private Consumer<byte[], byte[]> consumer;
     private Properties props;
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
@@ -103,14 +103,14 @@ public class KafkaSource<V> extends PushSource<V> {
             consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, V> consumerRecords;
+            ConsumerRecords<byte[], byte[]> consumerRecords;
             while(true){
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, V> consumerRecord : consumerRecords) {
+                for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
                     LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord);
+                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
                     consumeFunction.accept(record);
                     futures[index] = record.getCompletableFuture();
                     index++;
@@ -130,13 +130,18 @@ public class KafkaSource<V> extends PushSource<V> {
         runnerThread.start();
     }
 
+    public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+
     static private class KafkaRecord<V> implements Record<V> {
-        private final ConsumerRecord<String, V> record;
+        private final ConsumerRecord<byte[], byte[]> record;
+        private final V value;
         @Getter
         private final CompletableFuture<Void> completableFuture = new CompletableFuture();
 
-        public KafkaRecord(ConsumerRecord<String, V> record) {
+        public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+                           V value) {
             this.record = record;
+            this.value = value;
         }
         @Override
         public String getPartitionId() {
@@ -150,7 +155,7 @@ public class KafkaSource<V> extends PushSource<V> {
 
         @Override
         public V getValue() {
-            return record.value();
+            return value;
         }
 
         @Override

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.