You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:51:31 UTC

[incubator-pulsar] branch master updated: Make the current reference sinks usable (#1814)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 481f69e  Make the current reference sinks usable (#1814)
481f69e is described below

commit 481f69eae0ca74f99f358389058b439986bf24e6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 21 00:51:28 2018 -0700

    Make the current reference sinks usable (#1814)
    
    * Make the current reference sinks usable
    
    * removed unused imports
---
 .../org/apache/pulsar/io/aerospike/AerospikeSink.java     | 15 +++++++++------
 .../org/apache/pulsar/io/cassandra/CassandraSink.java     | 14 ++++++++------
 .../main/java/org/apache/pulsar/io/kafka/KafkaSink.java   | 13 ++++++++-----
 3 files changed, 25 insertions(+), 17 deletions(-)

diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
index f1390c1..18e8519 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
@@ -33,7 +33,6 @@ import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.common.util.KeyValue;
 import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,9 +42,10 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingDeque;
 
 /**
- * Simple AeroSpike sink
+ * A Simple abstract class for Aerospike sink
+ * Users need to implement extractKeyValue function to use this sink
  */
-public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class AerospikeSink<K, V> extends SimpleSink<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class);
 
@@ -83,10 +83,11 @@ public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
     }
 
     @Override
-    public CompletableFuture<Void> write(KeyValue<K, V> record) {
+    public CompletableFuture<Void> write(byte[] record) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), record.getKey().toString());
-        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(record.getValue()));
+        KeyValue<K, V> keyValue = extractKeyValue(record);
+        Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
+        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(keyValue.getValue()));
         AWriteListener listener = null;
         try {
             listener = queue.take();
@@ -154,4 +155,6 @@ public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
             }
         }
     }
+
+    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
 }
\ No newline at end of file
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
index 9aa09e9..710feb7 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
@@ -29,7 +29,6 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.pulsar.common.util.KeyValue;
 import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,10 +36,10 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
- * Simple Cassandra sink
- * Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname.
+ * A Simple abstract class for Cassandra sink
+ * Users need to implement extractKeyValue function to use this sink
  */
-public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class CassandraSink<K, V> extends SimpleSink<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
 
@@ -72,8 +71,9 @@ public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
     }
 
     @Override
-    public CompletableFuture<Void> write(KeyValue<K, V> record) {
-        BoundStatement bound = statement.bind(record.getKey(), record.getValue());
+    public CompletableFuture<Void> write(byte[] record) {
+        KeyValue<K, V> keyValue = extractKeyValue(record);
+        BoundStatement bound = statement.bind(keyValue.getKey(), keyValue.getValue());
         ResultSetFuture future = session.executeAsync(bound);
         CompletableFuture<Void> completable = new CompletableFuture<Void>();
         Futures.addCallback(future,
@@ -108,4 +108,6 @@ public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
         session = cluster.connect();
         session.execute("USE " + cassandraSinkConfig.getKeyspace());
     }
+
+    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
 }
\ No newline at end of file
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
index 08ca652..5aa9894 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.common.util.KeyValue;
 import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +36,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
 /**
- * Simple Kafka Sink to publish messages to a Kafka topic
+ * A Simple abstract class for Kafka sink
+ * Users need to implement extractKeyValue function to use this sink
  */
-public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class KafkaSink<K, V> extends SimpleSink<byte[]> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
 
@@ -48,8 +48,9 @@ public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
-    public CompletableFuture<Void> write(KeyValue<K, V> message) {
-        ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), message.getValue());
+    public CompletableFuture<Void> write(byte[] message) {
+        KeyValue<K, V> keyValue = extractKeyValue(message);
+        ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), keyValue.getValue());
         LOG.debug("Record sending to kafka, record={}.", record);
         Future f = producer.send(record);
         return CompletableFuture.supplyAsync(() -> {
@@ -91,4 +92,6 @@ public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
         LOG.info("Kafka sink started.");
     }
+
+    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.