You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/24 16:33:28 UTC

[GitHub] sijie closed pull request #1842: Make the KakfaSource abstract by allowing users to implement method to extract value

sijie closed pull request #1842: Make the KakfaSource abstract by allowing users to implement method to extract value
URL: https://github.com/apache/incubator-pulsar/pull/1842
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
index 6618c1ce96..fa734184b1 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
@@ -39,11 +39,11 @@
 /**
  * Simple Kafka Source to transfer messages from a Kafka topic
  */
-public class KafkaSource<V> extends PushSource<V> {
+public abstract class KafkaSource<V> extends PushSource<V> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
 
-    private Consumer<String, V> consumer;
+    private Consumer<byte[], byte[]> consumer;
     private Properties props;
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
@@ -103,14 +103,14 @@ public void start() {
             consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, V> consumerRecords;
+            ConsumerRecords<byte[], byte[]> consumerRecords;
             while(true){
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, V> consumerRecord : consumerRecords) {
+                for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
                     LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
-                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord);
+                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
                     consumeFunction.accept(record);
                     futures[index] = record.getCompletableFuture();
                     index++;
@@ -130,13 +130,18 @@ public void start() {
         runnerThread.start();
     }
 
+    public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+
     static private class KafkaRecord<V> implements Record<V> {
-        private final ConsumerRecord<String, V> record;
+        private final ConsumerRecord<byte[], byte[]> record;
+        private final V value;
         @Getter
         private final CompletableFuture<Void> completableFuture = new CompletableFuture();
 
-        public KafkaRecord(ConsumerRecord<String, V> record) {
+        public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+                           V value) {
             this.record = record;
+            this.value = value;
         }
         @Override
         public String getPartitionId() {
@@ -150,7 +155,7 @@ public long getRecordSequence() {
 
         @Override
         public V getValue() {
-            return record.value();
+            return value;
         }
 
         @Override


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services