You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/24 16:33:28 UTC
[incubator-pulsar] branch master updated: Make the KakfaSource
abstract by allowing users to implement method to extract value (#1842)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e249493 Make the KakfaSource abstract by allowing users to implement method to extract value (#1842)
e249493 is described below
commit e249493b7e3a5a5cae4e5a779b9499caccd6324a
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu May 24 09:33:22 2018 -0700
Make the KakfaSource abstract by allowing users to implement method to extract value (#1842)
---
.../org/apache/pulsar/io/kafka/KafkaSource.java | 21 +++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
index 6618c1c..fa73418 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
@@ -39,11 +39,11 @@ import java.util.concurrent.ExecutionException;
/**
* Simple Kafka Source to transfer messages from a Kafka topic
*/
-public class KafkaSource<V> extends PushSource<V> {
+public abstract class KafkaSource<V> extends PushSource<V> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
- private Consumer<String, V> consumer;
+ private Consumer<byte[], byte[]> consumer;
private Properties props;
private KafkaSourceConfig kafkaSourceConfig;
Thread runnerThread;
@@ -103,14 +103,14 @@ public class KafkaSource<V> extends PushSource<V> {
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
- ConsumerRecords<String, V> consumerRecords;
+ ConsumerRecords<byte[], byte[]> consumerRecords;
while(true){
consumerRecords = consumer.poll(1000);
CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()];
int index = 0;
- for (ConsumerRecord<String, V> consumerRecord : consumerRecords) {
+ for (ConsumerRecord<byte[], byte[]> consumerRecord : consumerRecords) {
LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value());
- KafkaRecord<V> record = new KafkaRecord<>(consumerRecord);
+ KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, extractValue(consumerRecord));
consumeFunction.accept(record);
futures[index] = record.getCompletableFuture();
index++;
@@ -130,13 +130,18 @@ public class KafkaSource<V> extends PushSource<V> {
runnerThread.start();
}
+ public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+
static private class KafkaRecord<V> implements Record<V> {
- private final ConsumerRecord<String, V> record;
+ private final ConsumerRecord<byte[], byte[]> record;
+ private final V value;
@Getter
private final CompletableFuture<Void> completableFuture = new CompletableFuture();
- public KafkaRecord(ConsumerRecord<String, V> record) {
+ public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+ V value) {
this.record = record;
+ this.value = value;
}
@Override
public String getPartitionId() {
@@ -150,7 +155,7 @@ public class KafkaSource<V> extends PushSource<V> {
@Override
public V getValue() {
- return record.value();
+ return value;
}
@Override
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.