You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/17 17:25:27 UTC
[incubator-pulsar] branch master updated: Modifying sink interface
to be generic (#1792)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5e4f2bb Modifying sink interface to be generic (#1792)
5e4f2bb is described below
commit 5e4f2bbbf791e216e2eb6c50088c1c5910f0ec20
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 17 10:25:24 2018 -0700
Modifying sink interface to be generic (#1792)
---
.../functions/instance/JavaInstanceRunnable.java | 10 +--
.../apache/pulsar/functions/sink/PulsarSink.java | 8 +-
.../apache/pulsar/functions/sink/RuntimeSink.java | 51 ------------
.../functions/sink/DefaultRuntimeSinkTest.java | 96 ----------------------
.../apache/pulsar/io/aerospike/AerospikeSink.java | 3 +-
.../apache/pulsar/io/cassandra/CassandraSink.java | 3 +-
.../java/org/apache/pulsar/io/core/SimpleSink.java | 46 +++--------
.../main/java/org/apache/pulsar/io/core/Sink.java | 21 ++---
.../java/org/apache/pulsar/io/kafka/KafkaSink.java | 3 +-
9 files changed, 34 insertions(+), 207 deletions(-)
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4aaed5b..5b5e943 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -56,10 +56,8 @@ import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.sink.DefaultRuntimeSink;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
-import org.apache.pulsar.functions.sink.RuntimeSink;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
@@ -106,7 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Record currentRecord;
private Source source;
- private RuntimeSink sink;
+ private Sink sink;
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
@@ -524,10 +522,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
Thread.currentThread().getContextClassLoader());
}
- if (object instanceof RuntimeSink) {
- this.sink = (RuntimeSink) object;
- } else if (object instanceof Sink) {
- this.sink = DefaultRuntimeSink.of((Sink) object);
+ if (object instanceof Sink) {
+ this.sink = (Sink) object;
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 61deeff..4fccb54 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -39,13 +39,14 @@ import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Sink;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@Slf4j
-public class PulsarSink<T> implements RuntimeSink<T> {
+public class PulsarSink<T> implements Sink<T> {
private PulsarClient client;
private PulsarSinkConfig pulsarSinkConfig;
@@ -207,11 +208,6 @@ public class PulsarSink<T> implements RuntimeSink<T> {
}
@Override
- public CompletableFuture<Void> write(T value) {
- return null;
- }
-
- @Override
public void write(RecordContext recordContext, T value) throws Exception {
byte[] output;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
deleted file mode 100644
index e9c8dc5..0000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.sink;
-
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-
-/**
- * This class extends connect sink.
- *
- * <p>Runtime should interact sink rather than interact directly to the public {@link Sink} interface.
- *
- * <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink
- * should be implemented using this interface to ensure supporting effective-once.
- */
-public interface RuntimeSink<T> extends Sink<T>{
-
- /**
- * Write the <tt>value</tt>value.
- *
- * <p>The implementation of this class is responsible for notifying the runtime whether the input record
- * for generating this value is done with processing by {@link RecordContext#ack} and {@link RecordContext#fail}.
- *
- * @param inputRecordContext input record context
- * @param value output value computed from the runtime.
- */
- default void write(RecordContext inputRecordContext, T value) throws Exception {
- write(value)
- .thenAccept(ignored -> inputRecordContext.ack())
- .exceptionally(cause -> {
- inputRecordContext.fail();
- return null;
- });
- }
-}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
deleted file mode 100644
index 018a968..0000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.sink;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test {@link DefaultRuntimeSink}.
- */
-public class DefaultRuntimeSinkTest {
-
- private Sink<String> mockSink;
- private RuntimeSink<String> runtimeSink;
-
- @BeforeMethod
- public void setup() {
- this.mockSink = mock(Sink.class);
- this.runtimeSink = DefaultRuntimeSink.of(mockSink);
- }
-
- @Test
- public void testOpen() throws Exception {
- this.runtimeSink.open(Collections.emptyMap());
-
- verify(mockSink, times(1)).open(any(Map.class));
- }
-
- @Test
- public void testClose() throws Exception {
- this.runtimeSink.close();
-
- verify(mockSink, times(1)).close();
- }
-
- @Test
- public void testWrite() throws Exception {
- this.runtimeSink.write("test-record");
- verify(mockSink, times(1)).write(eq("test-record"));
- }
-
- @Test
- public void testWriteAck() throws Exception {
- RecordContext context = mock(RecordContext.class);
-
- CompletableFuture<Void> writeFuture = new CompletableFuture<>();
- writeFuture.complete(null);
- when(mockSink.write(anyString())).thenReturn(writeFuture);
-
- runtimeSink.write(context, "test-record");
-
- verify(context, times(1)).ack();
- }
-
- @Test
- public void testWriteFail() throws Exception {
- RecordContext context = mock(RecordContext.class);
-
- CompletableFuture<Void> writeFuture = new CompletableFuture<>();
- writeFuture.completeExceptionally(new Exception("test-exception"));
- when(mockSink.write(anyString())).thenReturn(writeFuture);
-
- runtimeSink.write(context, "test-record");
-
- verify(context, times(1)).fail();
- }
-}
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
index 34df2aa..f1390c1 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
@@ -32,6 +32,7 @@ import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ import java.util.concurrent.LinkedBlockingDeque;
/**
* Simple AeroSpike sink
*/
-public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> {
+public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class);
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
index 14abc9b..9aa09e9 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
@@ -28,6 +28,7 @@ import com.datastax.driver.core.Session;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
* Simple Cassandra sink
* Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname.
*/
-public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> {
+public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
similarity index 53%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
rename to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
index 86cd5b5..2c29bc8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
@@ -16,38 +16,24 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.sink;
+package org.apache.pulsar.io.core;
-import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.io.core.Sink;
/**
- * The default implementation of runtime sink.
- *
- * @param <T>
+ * A simpler version of the Sink interface users can extend for use cases to
+ * don't require fine grained delivery control
*/
-public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
-
- public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) {
- return new DefaultRuntimeSink<>(sink);
- }
-
- private final Sink<T> sink;
+public abstract class SimpleSink<T> implements Sink<T> {
- private DefaultRuntimeSink(Sink<T> sink) {
- this.sink = sink;
- }
-
- /**
- * Open connector with configuration
- *
- * @param config initialization config
- * @throws Exception IO type exceptions when opening a connector
- */
@Override
- public void open(final Map<String, Object> config) throws Exception {
- sink.open(config);
+ public void write(RecordContext inputRecordContext, T value) throws Exception {
+ write(value)
+ .thenAccept(ignored -> inputRecordContext.ack())
+ .exceptionally(cause -> {
+ inputRecordContext.fail();
+ return null;
+ });
}
/**
@@ -56,13 +42,5 @@ public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
* @param value output value
* @return Completable future fo async publish request
*/
- @Override
- public CompletableFuture<Void> write(T value) {
- return sink.write(value);
- }
-
- @Override
- public void close() throws Exception {
- sink.close();
- }
+ public abstract CompletableFuture<Void> write(T value);
}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 48a58e7..0265e77 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -19,18 +19,11 @@
package org.apache.pulsar.io.core;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
/**
- * Pulsar's Sink interface. Sink read data from
- * a Pulsar topic and write it to external sinks(kv store, database, filesystem ,etc)
- * The lifcycle of a Sink is to open it passing any config needed
- * by it to initialize(like open network connection, authenticate, etc).
- * On every message from the designated PulsarTopic, the write method is
- * invoked which writes the message to the external sink. One can use close
- * at the end of the session to do any cleanup
+ * Generic sink interface users can implement to run Sink on top of Pulsar Functions
*/
-public interface Sink<T> extends AutoCloseable {
+public interface Sink<T> extends AutoCloseable{
/**
* Open connector with configuration
*
@@ -45,5 +38,13 @@ public interface Sink<T> extends AutoCloseable {
* @param value output value
* @return Completable future fo async publish request
*/
- CompletableFuture<Void> write(T value);
+
+
+ /**
+ * Write a message to Sink
+ * @param inputRecordContext Context of value
+ * @param value value to write to sink
+ * @throws Exception
+ */
+ void write(RecordContext inputRecordContext, T value) throws Exception;
}
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
index 13d65ab..08ca652 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
import org.apache.pulsar.io.core.Sink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import java.util.concurrent.Future;
/**
* Simple Kafka Sink to publish messages to a Kafka topic
*/
-public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
+public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.