You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:51:31 UTC
[incubator-pulsar] branch master updated: Make the current
reference sinks usable (#1814)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 481f69e Make the current reference sinks usable (#1814)
481f69e is described below
commit 481f69eae0ca74f99f358389058b439986bf24e6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Mon May 21 00:51:28 2018 -0700
Make the current reference sinks usable (#1814)
* Make the current reference sinks usable
* removed unused imports
---
.../org/apache/pulsar/io/aerospike/AerospikeSink.java | 15 +++++++++------
.../org/apache/pulsar/io/cassandra/CassandraSink.java | 14 ++++++++------
.../main/java/org/apache/pulsar/io/kafka/KafkaSink.java | 13 ++++++++-----
3 files changed, 25 insertions(+), 17 deletions(-)
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
index f1390c1..18e8519 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
@@ -33,7 +33,6 @@ import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,9 +42,10 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
/**
- * Simple AeroSpike sink
+ * A Simple abstract class for Aerospike sink
+ * Users need to implement extractKeyValue function to use this sink
*/
-public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class AerospikeSink<K, V> extends SimpleSink<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class);
@@ -83,10 +83,11 @@ public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
}
@Override
- public CompletableFuture<Void> write(KeyValue<K, V> record) {
+ public CompletableFuture<Void> write(byte[] record) {
CompletableFuture<Void> future = new CompletableFuture<>();
- Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), record.getKey().toString());
- Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(record.getValue()));
+ KeyValue<K, V> keyValue = extractKeyValue(record);
+ Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
+ Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(keyValue.getValue()));
AWriteListener listener = null;
try {
listener = queue.take();
@@ -154,4 +155,6 @@ public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
}
}
}
+
+ public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
\ No newline at end of file
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
index 9aa09e9..710feb7 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
@@ -29,7 +29,6 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,10 +36,10 @@ import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
- * Simple Cassandra sink
- * Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname.
+ * A Simple abstract class for Cassandra sink
+ * Users need to implement extractKeyValue function to use this sink
*/
-public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class CassandraSink<K, V> extends SimpleSink<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
@@ -72,8 +71,9 @@ public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
}
@Override
- public CompletableFuture<Void> write(KeyValue<K, V> record) {
- BoundStatement bound = statement.bind(record.getKey(), record.getValue());
+ public CompletableFuture<Void> write(byte[] record) {
+ KeyValue<K, V> keyValue = extractKeyValue(record);
+ BoundStatement bound = statement.bind(keyValue.getKey(), keyValue.getValue());
ResultSetFuture future = session.executeAsync(bound);
CompletableFuture<Void> completable = new CompletableFuture<Void>();
Futures.addCallback(future,
@@ -108,4 +108,6 @@ public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
session = cluster.connect();
session.execute("USE " + cassandraSinkConfig.getKeyspace());
}
+
+ public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
\ No newline at end of file
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
index 08ca652..5aa9894 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.common.util.KeyValue;
import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,9 +36,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
- * Simple Kafka Sink to publish messages to a Kafka topic
+ * A Simple abstract class for Kafka sink
+ * Users need to implement extractKeyValue function to use this sink
*/
-public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
+public abstract class KafkaSink<K, V> extends SimpleSink<byte[]> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
@@ -48,8 +48,9 @@ public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
private KafkaSinkConfig kafkaSinkConfig;
@Override
- public CompletableFuture<Void> write(KeyValue<K, V> message) {
- ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), message.getValue());
+ public CompletableFuture<Void> write(byte[] message) {
+ KeyValue<K, V> keyValue = extractKeyValue(message);
+ ProducerRecord<K, V> record = new ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), keyValue.getValue());
LOG.debug("Record sending to kafka, record={}.", record);
Future f = producer.send(record);
return CompletableFuture.supplyAsync(() -> {
@@ -91,4 +92,6 @@ public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
LOG.info("Kafka sink started.");
}
+
+ public abstract KeyValue<K, V> extractKeyValue(byte[] message);
}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.