You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/17 17:25:27 UTC

[incubator-pulsar] branch master updated: Modifying sink interface to be generic (#1792)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e4f2bb  Modifying sink interface to be generic (#1792)
5e4f2bb is described below

commit 5e4f2bbbf791e216e2eb6c50088c1c5910f0ec20
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu May 17 10:25:24 2018 -0700

    Modifying sink interface to be generic (#1792)
---
 .../functions/instance/JavaInstanceRunnable.java   | 10 +--
 .../apache/pulsar/functions/sink/PulsarSink.java   |  8 +-
 .../apache/pulsar/functions/sink/RuntimeSink.java  | 51 ------------
 .../functions/sink/DefaultRuntimeSinkTest.java     | 96 ----------------------
 .../apache/pulsar/io/aerospike/AerospikeSink.java  |  3 +-
 .../apache/pulsar/io/cassandra/CassandraSink.java  |  3 +-
 .../java/org/apache/pulsar/io/core/SimpleSink.java | 46 +++--------
 .../main/java/org/apache/pulsar/io/core/Sink.java  | 21 ++---
 .../java/org/apache/pulsar/io/kafka/KafkaSink.java |  3 +-
 9 files changed, 34 insertions(+), 207 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4aaed5b..5b5e943 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -56,10 +56,8 @@ import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.sink.DefaultRuntimeSink;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
-import org.apache.pulsar.functions.sink.RuntimeSink;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
@@ -106,7 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     private Record currentRecord;
 
     private Source source;
-    private RuntimeSink sink;
+    private Sink sink;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
@@ -524,10 +522,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                     Thread.currentThread().getContextClassLoader());
         }
 
-        if (object instanceof RuntimeSink) {
-            this.sink = (RuntimeSink) object;
-        } else if (object instanceof Sink) {
-            this.sink = DefaultRuntimeSink.of((Sink) object);
+        if (object instanceof Sink) {
+            this.sink = (Sink) object;
         } else {
             throw new RuntimeException("Sink does not implement correct interface");
         }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 61deeff..4fccb54 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -39,13 +39,14 @@ import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Sink;
 
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 @Slf4j
-public class PulsarSink<T> implements RuntimeSink<T> {
+public class PulsarSink<T> implements Sink<T> {
 
     private PulsarClient client;
     private PulsarSinkConfig pulsarSinkConfig;
@@ -207,11 +208,6 @@ public class PulsarSink<T> implements RuntimeSink<T> {
     }
 
     @Override
-    public CompletableFuture<Void> write(T value) {
-        return null;
-    }
-
-    @Override
     public void write(RecordContext recordContext, T value) throws Exception {
 
         byte[] output;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
deleted file mode 100644
index e9c8dc5..0000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.sink;
-
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-
-/**
- * This class extends connect sink.
- *
- * <p>Runtime should interact sink rather than interact directly to the public {@link Sink} interface.
- *
- * <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink
- * should be implemented using this interface to ensure supporting effective-once.
- */
-public interface RuntimeSink<T> extends Sink<T>{
-
-    /**
-     * Write the <tt>value</tt>value.
-     *
-     * <p>The implementation of this class is responsible for notifying the runtime whether the input record
-     * for generating this value is done with processing by {@link RecordContext#ack} and {@link RecordContext#fail}.
-     *
-     * @param inputRecordContext input record context
-     * @param value output value computed from the runtime.
-     */
-    default void write(RecordContext inputRecordContext, T value) throws Exception {
-        write(value)
-            .thenAccept(ignored -> inputRecordContext.ack())
-            .exceptionally(cause -> {
-                inputRecordContext.fail();
-                return null;
-            });
-    }
-}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
deleted file mode 100644
index 018a968..0000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.sink;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test {@link DefaultRuntimeSink}.
- */
-public class DefaultRuntimeSinkTest {
-
-    private Sink<String> mockSink;
-    private RuntimeSink<String> runtimeSink;
-
-    @BeforeMethod
-    public void setup() {
-        this.mockSink = mock(Sink.class);
-        this.runtimeSink = DefaultRuntimeSink.of(mockSink);
-    }
-
-    @Test
-    public void testOpen() throws Exception {
-        this.runtimeSink.open(Collections.emptyMap());
-
-        verify(mockSink, times(1)).open(any(Map.class));
-    }
-
-    @Test
-    public void testClose() throws Exception {
-        this.runtimeSink.close();
-
-        verify(mockSink, times(1)).close();
-    }
-
-    @Test
-    public void testWrite() throws Exception {
-        this.runtimeSink.write("test-record");
-        verify(mockSink, times(1)).write(eq("test-record"));
-    }
-
-    @Test
-    public void testWriteAck() throws Exception {
-        RecordContext context = mock(RecordContext.class);
-
-        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
-        writeFuture.complete(null);
-        when(mockSink.write(anyString())).thenReturn(writeFuture);
-
-        runtimeSink.write(context, "test-record");
-
-        verify(context, times(1)).ack();
-    }
-
-    @Test
-    public void testWriteFail() throws Exception {
-        RecordContext context = mock(RecordContext.class);
-
-        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
-        writeFuture.completeExceptionally(new Exception("test-exception"));
-        when(mockSink.write(anyString())).thenReturn(writeFuture);
-
-        runtimeSink.write(context, "test-record");
-
-        verify(context, times(1)).fail();
-    }
-}
diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
index 34df2aa..f1390c1 100644
--- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
+++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
@@ -32,6 +32,7 @@ import com.aerospike.client.listener.WriteListener;
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@ import java.util.concurrent.LinkedBlockingDeque;
 /**
  * Simple AeroSpike sink
  */
-public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> {
+public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class);
 
diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
index 14abc9b..9aa09e9 100644
--- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
+++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
@@ -28,6 +28,7 @@ import com.datastax.driver.core.Session;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
  * Simple Cassandra sink
  * Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname.
  */
-public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> {
+public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class);
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
similarity index 53%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
rename to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
index 86cd5b5..2c29bc8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
@@ -16,38 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.sink;
+package org.apache.pulsar.io.core;
 
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.io.core.Sink;
 
 /**
- * The default implementation of runtime sink.
- *
- * @param <T>
+ * A simpler version of the Sink interface users can extend for use cases to
+ * don't require fine grained delivery control
  */
-public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
-
-    public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) {
-        return new DefaultRuntimeSink<>(sink);
-    }
-
-    private final Sink<T> sink;
+public abstract class SimpleSink<T> implements Sink<T> {
 
-    private DefaultRuntimeSink(Sink<T> sink) {
-        this.sink = sink;
-    }
-
-    /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @throws Exception IO type exceptions when opening a connector
-     */
     @Override
-    public void open(final Map<String, Object> config) throws Exception {
-        sink.open(config);
+    public void write(RecordContext inputRecordContext, T value) throws Exception {
+        write(value)
+                .thenAccept(ignored -> inputRecordContext.ack())
+                .exceptionally(cause -> {
+                    inputRecordContext.fail();
+                    return null;
+                });
     }
 
     /**
@@ -56,13 +42,5 @@ public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
      * @param value output value
      * @return Completable future fo async publish request
      */
-    @Override
-    public CompletableFuture<Void> write(T value) {
-        return sink.write(value);
-    }
-
-    @Override
-    public void close() throws Exception {
-        sink.close();
-    }
+    public abstract CompletableFuture<Void> write(T value);
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 48a58e7..0265e77 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -19,18 +19,11 @@
 package org.apache.pulsar.io.core;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
- * Pulsar's Sink interface. Sink read data from
- * a Pulsar topic and write it to external sinks(kv store, database, filesystem ,etc)
- * The lifcycle of a Sink is to open it passing any config needed
- * by it to initialize(like open network connection, authenticate, etc).
- * On every message from the designated PulsarTopic, the write method is
- * invoked which writes the message to the external sink. One can use close
- * at the end of the session to do any cleanup
+ * Generic sink interface users can implement to run Sink on top of Pulsar Functions
  */
-public interface Sink<T> extends AutoCloseable {
+public interface Sink<T> extends AutoCloseable{
     /**
      * Open connector with configuration
      *
@@ -45,5 +38,13 @@ public interface Sink<T> extends AutoCloseable {
      * @param value output value
      * @return Completable future fo async publish request
      */
-    CompletableFuture<Void> write(T value);
+
+
+    /**
+     * Write a message to Sink
+     * @param inputRecordContext Context of value
+     * @param value value to write to sink
+     * @throws Exception
+     */
+    void write(RecordContext inputRecordContext, T value) throws Exception;
 }
diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
index 13d65ab..08ca652 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@ import java.util.concurrent.Future;
 /**
  * Simple Kafka Sink to publish messages to a Kafka topic
  */
-public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
+public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
 

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.