You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/02 21:43:12 UTC

[GitHub] sijie closed pull request #1708: Refactor functions to use Sink interface

sijie closed pull request #1708: Refactor functions to use Sink interface
URL: https://github.com/apache/incubator-pulsar/pull/1708
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index ca569e794c..cd2d63d242 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -46,4 +46,4 @@
      * @return Completable future fo async publish request
      */
     CompletableFuture<Void> write(T value);
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ca6414dc7a..2e8037b814 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -22,15 +22,13 @@
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
+import com.google.gson.Gson;
 import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
-import java.util.Base64;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import lombok.AccessLevel;
 import lombok.Getter;
@@ -49,18 +47,23 @@
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.MessageBuilder;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.instance.processors.MessageProcessor;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.sink.PulsarSinkConfig;
+import org.apache.pulsar.functions.sink.RuntimeSink;
 import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.source.PulsarSourceConfig;
+import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
-import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -93,18 +96,14 @@
     @Getter
     private Exception deathException;
 
-    @Getter(AccessLevel.PACKAGE)
-    private SerDe outputSerDe;
-
-    @Getter(AccessLevel.PACKAGE)
-    // processor
-    private final MessageProcessor processor;
-
     // function stats
     private final FunctionStats stats;
 
     private Record currentRecord;
 
+    private Source source;
+    private RuntimeSink sink;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -116,9 +115,6 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
-        this.processor = MessageProcessor.create(
-                client,
-                instanceConfig.getFunctionDetails());
     }
 
     /**
@@ -151,19 +147,16 @@ JavaInstance setupJavaInstance() throws Exception {
             typeArgs = TypeResolver.resolveRawArguments(java.util.function.Function.class, function.getClass());
         }
 
-        // setup serde
-        setupSerDe(typeArgs, clsLoader);
-
         // start the state table
         setupStateTable();
         // start the output producer
-        processor.setupOutput(outputSerDe);
+        setupOutput(typeArgs[1]);
         // start the input consumer
-        processor.setupInput(typeArgs[0]);
+        setupInput(typeArgs[0]);
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, processor.getSource());
+        return new JavaInstance(instanceConfig, object, clsLoader, client, this.source);
     }
 
     /**
@@ -175,9 +168,14 @@ public void run() {
             javaInstance = setupJavaInstance();
             while (true) {
 
-                currentRecord = processor.recieveMessage();
+                currentRecord = readInput();
 
-                processor.postReceiveMessage(currentRecord);
+                if (instanceConfig.getFunctionDetails().getProcessingGuarantees() == org.apache.pulsar.functions
+                        .proto.Function.ProcessingGuarantees.ATMOST_ONCE) {
+                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                        currentRecord.ack();
+                    }
+                }
 
                 // state object is per function, because we need to have the ability to know what updates
                 // are made in this function and ensure we only acknowledge after the state is persisted.
@@ -310,44 +308,47 @@ private void processResult(Record srcRecord,
             throw result.getSystemException();
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && instanceConfig.getFunctionDetails().getSink().getTopic() != null) {
-                byte[] output;
-                try {
-                    output = outputSerDe.serialize(result.getResult());
-                } catch (Exception ex) {
-                    stats.incrementSerializationExceptions();
-                    throw ex;
-                }
-                if (output != null) {
-                    sendOutputMessage(srcRecord, output);
-                } else {
-                    processor.sendOutputMessage(srcRecord, null);
-                }
+            if (result.getResult() != null) {
+                sendOutputMessage(srcRecord, result.getResult());
             } else {
                 // the function doesn't produce any result or the user doesn't want the result.
-                processor.sendOutputMessage(srcRecord, null);
+                srcRecord.ack();
             }
         }
     }
 
-    private void sendOutputMessage(Record srcRecord,
-                                   byte[] output) throws Exception {
-
-        MessageBuilder msgBuilder = MessageBuilder.create();
-        if (srcRecord instanceof PulsarRecord) {
-            PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
-            msgBuilder
-                    .setContent(output)
-                    .setProperty("__pfn_input_topic__", pulsarMessage.getTopicName())
-                    .setProperty("__pfn_input_msg_id__", new String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray())));
+    private void sendOutputMessage(Record srcRecord, Object output) {
+        try {
+            this.sink.write(srcRecord, output);
+        } catch (Exception e) {
+            log.info("Encountered exception in sink write: ", e);
+            throw new RuntimeException(e);
         }
+    }
 
-        processor.sendOutputMessage(srcRecord, msgBuilder);
+    private Record readInput() {
+        try {
+            return this.source.read();
+        } catch (Exception e) {
+            log.info("Encountered exception in source write: ", e);
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     public void close() {
-        processor.close();
+        try {
+            source.close();
+        } catch (Exception e) {
+            log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+        }
+
+        try {
+            sink.close();
+        } catch (Exception e) {
+            log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+        }
+
         if (null != javaInstance) {
             javaInstance.close();
         }
@@ -415,27 +416,6 @@ private static void addSystemMetrics(String metricName, double value, InstanceCo
         bldr.putMetrics(metricName, digest);
     }
 
-    private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
-        if (!Void.class.equals(typeArgs[1])) { // return type is not `Void.class`
-            if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() == null
-                    || instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()
-                    || instanceConfig.getFunctionDetails().getSink().getSerDeClassName().equals(DefaultSerDe.class.getName())) {
-                outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
-            } else {
-                this.outputSerDe = InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getSink().getSerDeClassName(), clsLoader, typeArgs[1]);
-            }
-            Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
-            if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
-                if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
-                    throw new RuntimeException("Default Serde does not support type " + typeArgs[1]);
-                }
-            } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArgs[1])) {
-                throw new RuntimeException("Inconsistent types found between function output type and output serde type: "
-                        + " function type = " + typeArgs[1] + "should be assignable from " + outputSerdeTypeArgs[0]);
-            }
-        }
-    }
-
     private void setupLogHandler() {
         if (instanceConfig.getFunctionDetails().getLogTopic() != null &&
                 !instanceConfig.getFunctionDetails().getLogTopic().isEmpty()) {
@@ -465,4 +445,92 @@ private void removeLogTopicHandler() {
         }
         config.getRootLogger().removeAppender(logAppender.getName());
     }
+
+    public void setupInput(Class<?> inputType) throws Exception {
+
+        SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource();
+        Object object;
+        if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
+
+            PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
+            pulsarSourceConfig.setTopicSerdeClassNameMap(sourceSpec.getTopicsToSerDeClassNameMap());
+            pulsarSourceConfig.setSubscriptionName(
+                    FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+            pulsarSourceConfig.setProcessingGuarantees(
+                    FunctionConfig.ProcessingGuarantees.valueOf(
+                            this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+            pulsarSourceConfig.setSubscriptionType(
+                    FunctionConfig.SubscriptionType.valueOf(sourceSpec.getSubscriptionType().name()));
+            pulsarSourceConfig.setTypeClassName(inputType.getName());
+
+            Object[] params = {this.client, pulsarSourceConfig};
+            Class[] paramTypes = {PulsarClient.class, PulsarSourceConfig.class};
+
+            object = Reflections.createInstance(
+                    sourceSpec.getClassName(),
+                    PulsarSource.class.getClassLoader(), params, paramTypes);
+
+        } else {
+            object = Reflections.createInstance(
+                    sourceSpec.getClassName(),
+                    Thread.currentThread().getContextClassLoader());
+        }
+
+        Class<?>[] typeArgs;
+        if (object instanceof Source) {
+            typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass());
+            assert typeArgs.length > 0;
+        } else {
+            throw new RuntimeException("Source does not implement correct interface");
+        }
+        this.source = (Source) object;
+
+        try {
+            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), Map.class));
+        } catch (Exception e) {
+            log.info("Error occurred executing open for source: {}",
+                    sourceSpec.getClassName(), e);
+        }
+    }
+
+    public void setupOutput(Class<?> outputType) throws Exception {
+
+        SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
+        Object object;
+        if (sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+            PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
+            pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
+                    this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+            pulsarSinkConfig.setTopic(sinkSpec.getTopic());
+            pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
+            pulsarSinkConfig.setTypeClassName(outputType.getName());
+
+            Object[] params = {this.client, pulsarSinkConfig};
+            Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class};
+
+            object = Reflections.createInstance(
+                    sinkSpec.getClassName(),
+                    PulsarSink.class.getClassLoader(), params, paramTypes);
+        } else {
+            object = Reflections.createInstance(
+                    sinkSpec.getClassName(),
+                    Thread.currentThread().getContextClassLoader());
+        }
+
+        Class<?>[] typeArgs;
+        if (object instanceof RuntimeSink) {
+            typeArgs = TypeResolver.resolveRawArguments(RuntimeSink.class, object.getClass());
+            assert typeArgs.length > 0;
+        } else {
+            throw new RuntimeException("Sink does not implement correct interface");
+        }
+        this.sink = (RuntimeSink) object;
+
+        try {
+            this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), Map.class));
+        } catch (Exception e) {
+            log.info("Error occurred executing open for sink: {}",
+                    sinkSpec.getClassName(), e);
+        }
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
deleted file mode 100644
index 8e149b0388..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-/**
- * A message processor that process messages at-most-once.
- */
-@Slf4j
-public class AtLeastOnceProcessor extends MessageProcessorBase {
-
-    @Getter
-    private Producer<byte[]> producer;
-
-    AtLeastOnceProcessor(PulsarClient client,
-                         FunctionDetails functionDetails) {
-        super(client, functionDetails);
-    }
-
-    @Override
-    protected void initializeOutputProducer(String outputTopic) throws Exception {
-        producer = AbstractOneOuputTopicProducers.createProducer(client, outputTopic);
-    }
-
-    @Override
-    public void sendOutputMessage(Record srcRecord, MessageBuilder outputMsgBuilder) {
-        if (null == outputMsgBuilder || null == producer) {
-            srcRecord.ack();
-            return;
-        }
-
-        Message<byte[]> outputMsg = outputMsgBuilder.build();
-        producer.sendAsync(outputMsg)
-            .thenAccept(msgId -> {
-                srcRecord.ack();
-            });
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        if (null != producer) {
-            try {
-                producer.close();
-            } catch (PulsarClientException e) {
-                log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(), e);
-            }
-        }
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
deleted file mode 100644
index 930161ed99..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-/**
- * A message processor that process messages at-most-once.
- */
-@Slf4j
-class AtMostOnceProcessor extends MessageProcessorBase {
-
-    private Producer<byte[]> producer;
-
-    AtMostOnceProcessor(PulsarClient client,
-                        FunctionDetails functionDetails) {
-        super(client, functionDetails);
-    }
-
-    @Override
-    public void postReceiveMessage(Record record) {
-        super.postReceiveMessage(record);
-        if (functionDetails.getAutoAck()) {
-            record.ack();
-        }
-    }
-
-    @Override
-    protected void initializeOutputProducer(String outputTopic) throws Exception {
-        producer = AbstractOneOuputTopicProducers.createProducer(client, outputTopic);
-    }
-
-    @Override
-    public void sendOutputMessage(Record srcRecord, MessageBuilder outputMsgBuilder) {
-        if (null == outputMsgBuilder) {
-            return;
-        }
-
-        Message<byte[]> outputMsg = outputMsgBuilder.build();
-        producer.sendAsync(outputMsg);
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        if (null != producer) {
-            try {
-                producer.close();
-            } catch (PulsarClientException e) {
-                log.warn("Fail to close producer for processor {}", functionDetails.getSink().getTopic(), e);
-            }
-        }
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
deleted file mode 100644
index 06a463b9cb..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.functions.source.PulsarRecord;
-import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
-import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-
-/**
- * A message processor that process messages effectively-once.
- */
-@Slf4j
-class EffectivelyOnceProcessor extends MessageProcessorBase implements ConsumerEventListener {
-
-    @Getter(AccessLevel.PACKAGE)
-    protected Producers outputProducer;
-
-    EffectivelyOnceProcessor(PulsarClient client,
-                             FunctionDetails functionDetails) {
-        super(client, functionDetails);
-    }
-
-    @Override
-    public void becameActive(Consumer<?> consumer, int partitionId) {
-        // if the instance becomes active for a given topic partition,
-        // open a producer for the results computed from this topic partition.
-        if (null != outputProducer) {
-            try {
-                this.outputProducer.getProducer(consumer.getTopic(), partitionId);
-            } catch (PulsarClientException e) {
-                // this can be ignored, because producer can be lazily created when accessing it.
-                log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
-                    consumer.getTopic(), partitionId);
-            }
-        }
-    }
-
-    @Override
-    public void becameInactive(Consumer<?> consumer, int partitionId) {
-        if (null != outputProducer) {
-            // if I lost the ownership of a partition, close its corresponding topic partition.
-            // this is to allow the new active consumer be able to produce to the result topic.
-            this.outputProducer.closeProducer(consumer.getTopic(), partitionId);
-        }
-    }
-
-    @Override
-    protected void initializeOutputProducer(String outputTopic) throws Exception {
-        outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic);
-        outputProducer.initialize();
-    }
-
-    //
-    // Methods to process messages
-    //
-
-    @Override
-    public void sendOutputMessage(Record srcRecord,
-                                  MessageBuilder outputMsgBuilder) throws Exception {
-        if (null == outputMsgBuilder) {
-            srcRecord.ack();
-            return;
-        }
-
-        // assign sequence id to output message for idempotent producing
-        outputMsgBuilder = outputMsgBuilder
-            .setSequenceId(srcRecord.getRecordSequence());
-
-        // currently on PulsarRecord
-        if (srcRecord instanceof PulsarRecord) {
-            PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
-            Producer producer = outputProducer.getProducer(pulsarMessage.getTopicName(),
-                    Integer.parseInt(srcRecord.getPartitionId()));
-
-            org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build();
-            producer.sendAsync(outputMsg)
-                    .thenAccept(messageId -> srcRecord.ack())
-                    .join();
-        }
-    }
-
-    @Override
-    public void close() {
-        super.close();
-        // kill the result producer
-        if (null != outputProducer) {
-            outputProducer.close();
-            outputProducer = null;
-        }
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
deleted file mode 100644
index 0dcf12c556..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import java.util.Map;
-
-import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.pulsar.client.api.MessageBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.connect.core.Source;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
-
-/**
- * A processor that processes messages, used by {@link org.apache.pulsar.functions.instance.JavaInstanceRunnable}.
- */
-@Evolving
-public interface MessageProcessor extends AutoCloseable {
-
-    static MessageProcessor create(PulsarClient client,
-                                   FunctionDetails functionDetails) {
-        ProcessingGuarantees processingGuarantees = functionDetails.getProcessingGuarantees();
-
-        if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-            return new EffectivelyOnceProcessor(
-                client,
-                functionDetails);
-        } else if (processingGuarantees == ProcessingGuarantees.ATMOST_ONCE) {
-            return new AtMostOnceProcessor(
-                client,
-                functionDetails);
-        } else {
-            return new AtLeastOnceProcessor(
-                client,
-                functionDetails);
-        }
-    }
-
-    void postReceiveMessage(Record record);
-
-    /**
-     * Setup the source. Implementation is responsible for initializing the source
-     * and for calling open method for source
-     * @param inputType the input type of the function
-     * @throws Exception
-     */
-    void setupInput(Class<?> inputType)
-        throws Exception;
-
-    /**
-     * Return the source.
-     *
-     * @return the source.
-     */
-    Source getSource();
-
-    /**
-     * Setup the output with a provided <i>outputSerDe</i>. The implementation of this processor is responsible for
-     * setting up the output
-     *
-     * @param outputSerDe output serde.
-     * @throws Exception
-     */
-    void setupOutput(SerDe outputSerDe) throws Exception;
-
-    /**
-     * Send the output message to the output topic. The output message is computed from <i>inputMsg</i>.
-     *
-     * <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't have to send any messages to the output.
-     * The implementation can decide to acknowledge the input message based on its process guarantees.
-     *
-     * @param srcRecord record from source
-     * @param outputMsgBuilder output message builder. it can be null.
-     */
-    void sendOutputMessage(Record srcRecord,
-                           MessageBuilder outputMsgBuilder) throws PulsarClientException, Exception;
-
-    /**
-     * Get the next message to process
-     * @return the next input message
-     * @throws Exception
-     */
-    Record recieveMessage() throws Exception;
-
-    @Override
-    void close();
-
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
deleted file mode 100644
index 33b699a63c..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.processors;
-
-import java.util.Map;
-
-import com.google.gson.Gson;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import net.jodah.typetools.TypeResolver;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.connect.core.Record;
-import org.apache.pulsar.connect.core.Source;
-import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.source.PulsarConfig;
-import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Reflections;
-
-/**
- * The base implementation of {@link MessageProcessor}.
- */
-@Slf4j
-abstract class MessageProcessorBase implements MessageProcessor {
-
-    protected final PulsarClient client;
-    protected final FunctionDetails functionDetails;
-
-    @Getter
-    protected Source source;
-
-
-    protected MessageProcessorBase(PulsarClient client,
-                                   FunctionDetails functionDetails) {
-        this.client = client;
-        this.functionDetails = functionDetails;
-    }
-
-    //
-    // Input
-    //
-
-    @Override
-    public void setupInput(Class<?> inputType) throws Exception {
-
-        org.apache.pulsar.functions.proto.Function.SourceSpec sourceSpec = this.functionDetails.getSource();
-        Object object;
-        if (sourceSpec.getClassName().equals(PulsarSource.class.getName())) {
-
-            PulsarConfig pulsarConfig = new PulsarConfig();
-            pulsarConfig.setTopicSerdeClassNameMap(this.functionDetails.getSource().getTopicsToSerDeClassNameMap());
-            pulsarConfig.setSubscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails));
-            pulsarConfig.setProcessingGuarantees(
-                    FunctionConfig.ProcessingGuarantees.valueOf(this.functionDetails.getProcessingGuarantees().name()));
-            pulsarConfig.setSubscriptionType(
-                    FunctionConfig.SubscriptionType.valueOf(this.functionDetails.getSource().getSubscriptionType().name()));
-            pulsarConfig.setTypeClassName(inputType.getName());
-
-            Object[] params = {this.client, pulsarConfig};
-            Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
-
-            object = Reflections.createInstance(
-                    sourceSpec.getClassName(),
-                    PulsarSource.class.getClassLoader(), params, paramTypes);
-
-        } else {
-            object = Reflections.createInstance(
-                    sourceSpec.getClassName(),
-                    Thread.currentThread().getContextClassLoader());
-        }
-
-        Class<?>[] typeArgs;
-        if (object instanceof Source) {
-            typeArgs = TypeResolver.resolveRawArguments(Source.class, object.getClass());
-            assert typeArgs.length > 0;
-        } else {
-            throw new RuntimeException("Source does not implement correct interface");
-        }
-        this.source = (Source) object;
-
-        try {
-            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), Map.class));
-        } catch (Exception e) {
-            log.info("Error occurred executing open for source: {}",
-                    this.functionDetails.getSource().getClassName(), e);
-        }
-
-    }
-
-    public Record recieveMessage() throws Exception {
-        return this.source.read();
-    }
-
-    /**
-     * Method called when a message is received from input after being put into the process queue.
-     *
-     * <p>The processor implementation can make a decision to process the message based on its processing guarantees.
-     * for example, an at-most-once processor can ack the message immediately.
-     *
-     * @param record input message.
-     */
-    @Override
-    public void postReceiveMessage(Record record) {}
-
-    //
-    // Output
-    //
-
-    @Override
-    public void setupOutput(SerDe outputSerDe) throws Exception {
-        String outputTopic = functionDetails.getSink().getTopic();
-        if (outputTopic != null
-                && !outputTopic.isEmpty()
-                && outputSerDe != null) {
-            log.info("Starting producer for output topic {}", outputTopic);
-            initializeOutputProducer(outputTopic);
-        }
-    }
-
-    protected abstract void initializeOutputProducer(String outputTopic) throws Exception;
-
-    //
-    // Process
-    //
-
-    @Override
-    public void close() {
-
-        try {
-            this.source.close();
-        } catch (Exception e) {
-            log.warn("Failed to close source {}", this.source, e);
-        }
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 3668311ed7..0359f7d190 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -18,9 +18,8 @@
  */
 package org.apache.pulsar.functions.instance.producers;
 
-import io.netty.util.collection.IntObjectHashMap;
-import io.netty.util.collection.IntObjectMap;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -37,7 +36,8 @@
 public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
-    private final Map<String, IntObjectMap<Producer<byte[]>>> producers;
+    private final Map<String, Map<String, Producer<byte[]>>> producers;
+
 
     public MultiConsumersOneOuputTopicProducers(PulsarClient client,
                                                 String outputTopic)
@@ -51,15 +51,15 @@ public void initialize() throws PulsarClientException {
         // no-op
     }
 
-    static String makeProducerName(String srcTopicName, int srcTopicPartition) {
+    static String makeProducerName(String srcTopicName, String srcTopicPartition) {
         return String.format("%s-%s", srcTopicName, srcTopicPartition);
     }
 
     @Override
-    public synchronized Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) throws PulsarClientException {
-        IntObjectMap<Producer<byte[]>> producerMap = producers.get(srcTopicName);
+    public synchronized Producer<byte[]> getProducer(String srcTopicName, String srcTopicPartition) throws PulsarClientException {
+        Map<String, Producer<byte[]>> producerMap = producers.get(srcTopicName);
         if (null == producerMap) {
-            producerMap = new IntObjectHashMap<>();
+            producerMap = new HashMap<>();
             producers.put(srcTopicName, producerMap);
         }
 
@@ -72,8 +72,8 @@ static String makeProducerName(String srcTopicName, int srcTopicPartition) {
     }
 
     @Override
-    public synchronized void closeProducer(String srcTopicName, int srcTopicPartition) {
-        IntObjectMap<Producer<byte[]>> producerMap = producers.get(srcTopicName);
+    public synchronized void closeProducer(String srcTopicName, String srcTopicPartition) {
+        Map<String, Producer<byte[]>> producerMap = producers.get(srcTopicName);
 
         if (null != producerMap) {
             Producer<byte[]> producer = producerMap.remove(srcTopicPartition);
@@ -89,7 +89,7 @@ public synchronized void closeProducer(String srcTopicName, int srcTopicPartitio
     @Override
     public synchronized void close() {
         List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size());
-        for (IntObjectMap<Producer<byte[]>> producerMap: producers.values()) {
+        for (Map<String, Producer<byte[]>> producerMap: producers.values()) {
             for (Producer<byte[]> producer : producerMap.values()) {
                 closeFutures.add(producer.closeAsync());
             }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
index 29cd96a1ee..b9d6a08799 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
@@ -42,7 +42,7 @@
      *          src topic partition
      * @return the producer instance to produce messages
      */
-    Producer<byte[]> getProducer(String srcTopicName, int srcTopicPartition) throws PulsarClientException;
+    Producer<byte[]> getProducer(String srcTopicName, String srcTopicPartition) throws PulsarClientException;
 
     /**
      * Close a producer specified by <tt>srcTopicName</tt> and <tt>srcTopicPartition</tt>
@@ -51,7 +51,7 @@
      * @param srcTopicPartition src topic partition
      */
     void closeProducer(String srcTopicName,
-                       int srcTopicPartition);
+                       String srcTopicPartition);
 
     @Override
     void close();
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
index 8e0d37aa3d..54e34c3227 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
@@ -20,7 +20,6 @@
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.connect.core.RecordContext;
 import org.apache.pulsar.connect.core.Sink;
 
 /**
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 7c89d92836..79c0b35162 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -18,5 +18,250 @@
  */
 package org.apache.pulsar.functions.sink;
 
-public class PulsarSink {
+import com.google.common.annotations.VisibleForTesting;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
+import org.apache.pulsar.functions.instance.producers.Producers;
+import org.apache.pulsar.functions.source.PulsarRecord;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
+import java.util.Base64;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+@Slf4j
+public class PulsarSink<T> implements RuntimeSink<T> {
+
+    private PulsarClient client;
+    private PulsarSinkConfig pulsarSinkConfig;
+    private SerDe<T> outputSerDe;
+
+    private PulsarSinkProcessor pulsarSinkProcessor;
+
+    private interface PulsarSinkProcessor {
+        void initializeOutputProducer(String outputTopic) throws Exception;
+
+        void sendOutputMessage(MessageBuilder outputMsgBuilder,
+                               PulsarRecord pulsarRecord) throws Exception;
+
+        void close() throws Exception;
+    }
+
+    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor {
+        private Producer<byte[]> producer;
+
+        @Override
+        public void initializeOutputProducer(String outputTopic) throws Exception {
+            this.producer = AbstractOneOuputTopicProducers.createProducer(
+                    client, pulsarSinkConfig.getTopic());
+        }
+
+        @Override
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+                                      PulsarRecord pulsarRecord) throws Exception {
+            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            this.producer.sendAsync(outputMsg);
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (null != producer) {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e);
+                }
+            }
+        }
+    }
+
+    private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor {
+        private Producer<byte[]> producer;
+
+        @Override
+        public void initializeOutputProducer(String outputTopic) throws Exception {
+            this.producer = AbstractOneOuputTopicProducers.createProducer(
+                    client, pulsarSinkConfig.getTopic());
+        }
+
+        @Override
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder,
+                                      PulsarRecord pulsarRecord) throws Exception {
+            Message<byte[]> outputMsg = outputMsgBuilder.build();
+            this.producer.sendAsync(outputMsg).thenAccept(messageId -> pulsarRecord.ack());
+        }
+
+        @Override
+        public void close() throws Exception {
+            if (null != producer) {
+                try {
+                    producer.close();
+                } catch (PulsarClientException e) {
+                    log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e);
+                }
+            }
+        }
+    }
+
+    private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor, ConsumerEventListener {
+
+        @Getter(AccessLevel.PACKAGE)
+        protected Producers outputProducer;
+
+        @Override
+        public void initializeOutputProducer(String outputTopic) throws Exception {
+            outputProducer = new MultiConsumersOneOuputTopicProducers(client, outputTopic);
+            outputProducer.initialize();
+        }
+
+        @Override
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder, PulsarRecord pulsarRecord)
+                throws Exception {
+
+            // assign sequence id to output message for idempotent producing
+            outputMsgBuilder = outputMsgBuilder
+                    .setSequenceId(pulsarRecord.getRecordSequence());
+
+            // currently on PulsarRecord
+            Producer producer = outputProducer.getProducer(pulsarRecord.getTopicName(),
+                    pulsarRecord.getPartitionId());
+
+            org.apache.pulsar.client.api.Message outputMsg = outputMsgBuilder.build();
+            producer.sendAsync(outputMsg)
+                    .thenAccept(messageId -> pulsarRecord.ack())
+                    .join();
+        }
+
+        @Override
+        public void close() throws Exception {
+            // kill the result producer
+            if (null != outputProducer) {
+                outputProducer.close();
+                outputProducer = null;
+            }
+        }
+
+        @Override
+        public void becameActive(Consumer<?> consumer, int partitionId) {
+            // if the instance becomes active for a given topic partition,
+            // open a producer for the results computed from this topic partition.
+            if (null != outputProducer) {
+                try {
+                    this.outputProducer.getProducer(consumer.getTopic(), Integer.toString(partitionId));
+                } catch (PulsarClientException e) {
+                    // this can be ignored, because producer can be lazily created when accessing it.
+                    log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
+                            consumer.getTopic(), partitionId);
+                }
+            }
+        }
+
+        @Override
+        public void becameInactive(Consumer<?> consumer, int partitionId) {
+            if (null != outputProducer) {
+                // if I lost the ownership of a partition, close its corresponding topic partition.
+                // this is to allow the new active consumer be able to produce to the result topic.
+                this.outputProducer.closeProducer(consumer.getTopic(), Integer.toString(partitionId));
+            }
+        }
+    }
+
+    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig) {
+        this.client = client;
+        this.pulsarSinkConfig = pulsarSinkConfig;
+    }
+
+    @Override
+    public void open(Map<String, Object> config) throws Exception {
+
+        // Setup Serialization/Deserialization
+        setupSerDe();
+
+        FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
+        switch (processingGuarantees) {
+            case ATMOST_ONCE:
+                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor();
+                break;
+            case ATLEAST_ONCE:
+                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor();
+                break;
+            case EFFECTIVELY_ONCE:
+                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
+                break;
+        }
+        this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic());
+    }
+
+    @Override
+    public CompletableFuture<Void> write(T value) {
+        return null;
+    }
+
+    @Override
+    public void write(RecordContext recordContext, T value) throws Exception {
+
+        PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
+
+        byte[] output;
+        try {
+            output = this.outputSerDe.serialize(value);
+        } catch (Exception e) {
+            //TODO Add serialization exception stats
+            throw new RuntimeException("Error occured when attempting to serialize output:", e);
+        }
+        MessageBuilder msgBuilder = MessageBuilder.create();
+        msgBuilder
+                .setContent(output)
+                .setProperty("__pfn_input_topic__", pulsarRecord.getTopicName())
+                .setProperty("__pfn_input_msg_id__", new String(
+                        Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
+        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, pulsarRecord);
+    }
+
+    @Override
+    public void close() throws Exception {
+        this.pulsarSinkProcessor.close();
+
+    }
+
+    @VisibleForTesting
+    void setupSerDe() throws ClassNotFoundException {
+        Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(
+                this.pulsarSinkConfig.getTypeClassName());
+
+        if (!Void.class.equals(typeArg)) { // return type is not `Void.class`
+            if (this.pulsarSinkConfig.getSerDeClassName() == null
+                    || this.pulsarSinkConfig.getSerDeClassName().isEmpty()
+                    || this.pulsarSinkConfig.getSerDeClassName().equals(DefaultSerDe.class.getName())) {
+                this.outputSerDe = InstanceUtils.initializeDefaultSerDe(typeArg);
+            } else {
+                this.outputSerDe = InstanceUtils.initializeSerDe(this.pulsarSinkConfig.getSerDeClassName(),
+                        Thread.currentThread().getContextClassLoader(), typeArg);
+            }
+            Class<?>[] outputSerdeTypeArgs = TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
+            if (outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
+                if (!DefaultSerDe.IsSupportedType(typeArg)) {
+                    throw new RuntimeException("Default Serde does not support type " + typeArg);
+                }
+            } else if (!outputSerdeTypeArgs[0].isAssignableFrom(typeArg)) {
+                throw new RuntimeException("Inconsistent types found between function output type and output serde type: "
+                        + " function type = " + typeArg + "should be assignable from " + outputSerdeTypeArgs[0]);
+            }
+        }
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
new file mode 100644
index 0000000000..1def3f1d2f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkConfig.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+
+import java.util.Map;
+
+@Getter
+@Setter
+@ToString
+public class PulsarSinkConfig {
+    private FunctionConfig.ProcessingGuarantees processingGuarantees;
+    private FunctionConfig.SubscriptionType subscriptionType;
+    private String topic;
+    private String serDeClassName;
+    private String typeClassName;
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
index 63a48ecb96..fe47705be0 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
@@ -29,7 +29,7 @@
  * <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink
  * should be implemented using this interface to ensure supporting effective-once.
  */
-public interface RuntimeSink<T> extends Sink<T> {
+public interface RuntimeSink<T> extends Sink<T>{
 
     /**
      * Write the <tt>value</tt>value.
@@ -40,7 +40,7 @@
      * @param inputRecordContext input record context
      * @param value output value computed from the runtime.
      */
-    default void write(RecordContext inputRecordContext, T value) {
+    default void write(RecordContext inputRecordContext, T value) throws Exception {
         write(value)
             .thenAccept(ignored -> inputRecordContext.ack())
             .exceptionally(cause -> {
@@ -48,5 +48,4 @@ default void write(RecordContext inputRecordContext, T value) {
                 return null;
             });
     }
-
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 9048544da6..dd0fb38a80 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.source;
 
+import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
@@ -41,27 +42,27 @@
 public class PulsarSource<T> implements Source<T> {
 
     private PulsarClient pulsarClient;
-    private PulsarConfig pulsarConfig;
+    private PulsarSourceConfig pulsarSourceConfig;
     private Map<String, SerDe> topicToSerDeMap = new HashMap<>();
 
     @Getter
     private org.apache.pulsar.client.api.Consumer inputConsumer;
 
-    public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) {
+    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig) {
         this.pulsarClient = pulsarClient;
-        this.pulsarConfig = pulsarConfig;
+        this.pulsarSourceConfig = pulsarConfig;
     }
 
     @Override
     public void open(Map<String, Object> config) throws Exception {
         // Setup Serialization/Deserialization
-        setupSerde();
+        setupSerDe();
 
         // Setup pulsar consumer
         this.inputConsumer = this.pulsarClient.newConsumer()
-                .topics(new ArrayList<>(this.pulsarConfig.getTopicSerdeClassNameMap().keySet()))
-                .subscriptionName(this.pulsarConfig.getSubscriptionName())
-                .subscriptionType(this.pulsarConfig.getSubscriptionType().get())
+                .topics(new ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()))
+                .subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
+                .subscriptionType(this.pulsarSourceConfig.getSubscriptionType().get())
                 .ackTimeout(1, TimeUnit.MINUTES)
                 .subscribe();
     }
@@ -81,7 +82,7 @@ public void open(Map<String, Object> config) throws Exception {
             MessageIdImpl messageId = (MessageIdImpl) topicMessageId.getInnerMessageId();
             partitionId = Long.toString(messageId.getPartitionIndex());
         } else {
-            topicName = this.pulsarConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
+            topicName = this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
             partitionId = Long.toString(((MessageIdImpl) message.getMessageId()).getPartitionIndex());
         }
 
@@ -107,13 +108,13 @@ public void open(Map<String, Object> config) throws Exception {
                 .sequenceId(message.getSequenceId())
                 .topicName(topicName)
                 .ackFunction(() -> {
-                    if (pulsarConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
                     } else {
                         inputConsumer.acknowledgeAsync(message);
                     }
                 }).failFunction(() -> {
-                    if (pulsarConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+                    if (pulsarSourceConfig.getProcessingGuarantees() == FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         throw new RuntimeException("Failed to process message: " + message.getMessageId());
                     }
                 })
@@ -126,17 +127,18 @@ public void close() throws Exception {
         this.inputConsumer.close();
     }
 
-    private void setupSerde() throws ClassNotFoundException {
+    @VisibleForTesting
+    void setupSerDe() throws ClassNotFoundException {
 
-        Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarConfig.getTypeClassName());
+        Class<?> typeArg = Thread.currentThread().getContextClassLoader().loadClass(this.pulsarSourceConfig.getTypeClassName());
         if (Void.class.equals(typeArg)) {
             throw new RuntimeException("Input type of Pulsar Function cannot be Void");
         }
 
-        for (Map.Entry<String, String> entry : this.pulsarConfig.getTopicSerdeClassNameMap().entrySet()) {
+        for (Map.Entry<String, String> entry : this.pulsarSourceConfig.getTopicSerdeClassNameMap().entrySet()) {
             String topic = entry.getKey();
             String serDeClassname = entry.getValue();
-            if (serDeClassname.isEmpty()) {
+            if (serDeClassname == null || serDeClassname.isEmpty()) {
                 serDeClassname = DefaultSerDe.class.getName();
             }
             SerDe serDe = InstanceUtils.initializeSerDe(serDeClassname,
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
similarity index 87%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
index 2a5dc44d32..4d5e540e1e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSourceConfig.java
@@ -19,10 +19,7 @@
 package org.apache.pulsar.functions.source;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.Builder;
-import lombok.Data;
 import lombok.Getter;
-import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -33,7 +30,7 @@
 @Getter
 @Setter
 @ToString
-public class PulsarConfig {
+public class PulsarSourceConfig {
 
     private FunctionConfig.ProcessingGuarantees processingGuarantees;
     private FunctionConfig.SubscriptionType subscriptionType;
@@ -41,8 +38,8 @@
     private Map<String, String> topicSerdeClassNameMap;
     private String typeClassName;
 
-    public static PulsarConfig load(Map<String, Object> map) throws IOException {
+    public static PulsarSourceConfig load(Map<String, Object> map) throws IOException {
         ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarConfig.class);
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), PulsarSourceConfig.class);
     }
 }
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index f2763a9a48..15d264328a 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -58,14 +58,10 @@ def main():
   parser.add_argument('--name', required=True, help='Function Name')
   parser.add_argument('--tenant', required=True, help='Tenant Name')
   parser.add_argument('--namespace', required=True, help='Namespace name')
-  parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe')
-  parser.add_argument('--output_topic', required=False, help='Output Topic')
-  parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames')
   parser.add_argument('--instance_id', required=True, help='Instance Id')
   parser.add_argument('--function_id', required=True, help='Function Id')
   parser.add_argument('--function_version', required=True, help='Function Version')
   parser.add_argument('--processing_guarantees', required=True, help='Processing Guarantees')
-  parser.add_argument('--source_subscription_type', required=True, help='Subscription Type')
   parser.add_argument('--pulsar_serviceurl', required=True, help='Pulsar Service Url')
   parser.add_argument('--port', required=True, help='Instance Port', type=int)
   parser.add_argument('--max_buffered_tuples', required=True, help='Maximum number of Buffered tuples')
@@ -74,6 +70,10 @@ def main():
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--auto_ack', required=True, help='Enable Autoacking?')
   parser.add_argument('--log_topic', required=False, help='Topic to send Log Messages')
+  parser.add_argument('--source_subscription_type', required=True, help='Subscription Type')
+  parser.add_argument('--source_topics_serde_classname', required=True, help='A mapping of Input topics to SerDe')
+  parser.add_argument('--sink_topic', required=False, help='Sink Topic')
+  parser.add_argument('--sink_serde_classname', required=False, help='Sink SerDe classname')
 
   args = parser.parse_args()
   log_file = os.path.join(args.logging_directory,
@@ -104,10 +104,10 @@ def main():
   function_details.source.MergeFrom(sourceSpec)
 
   sinkSpec = Function_pb2.SinkSpec()
-  if args.output_topic != None and len(args.output_topic) != 0:
-    sinkSpec.topic = args.output_topic
-  if args.output_serde_classname != None and len(args.output_serde_classname) != 0:
-    sinkSpec.serDeClassName = args.output_serde_classname
+  if args.sink_topic != None and len(args.sink_topic) != 0:
+    sinkSpec.topic = args.sink_topic
+  if args.sink_serde_classname != None and len(args.sink_serde_classname) != 0:
+    sinkSpec.serDeClassName = args.sink_serde_classname
   function_details.sink.MergeFrom(sinkSpec)
 
   function_details.processingGuarantees = Function_pb2.ProcessingGuarantees.Value(args.processing_guarantees)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 289dbbaa72..12d4f19a04 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -20,20 +20,14 @@
 
 import lombok.Getter;
 import lombok.Setter;
-import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.testng.annotations.Test;
 
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 
-import static org.testng.AssertJUnit.*;
-
 public class JavaInstanceRunnableTest {
 
     static class IntegerSerDe implements SerDe<Integer> {
@@ -111,79 +105,4 @@ public Void process(String input, Context context) throws Exception {
             return null;
         }
     }
-
-    /**
-     * Verify that JavaInstance does support functions that output Void type
-     */
-    @Test
-    public void testVoidOutputClasses() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-            VoidOutputHandler pulsarFunction = new VoidOutputHandler();
-            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, pulsarFunction.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-    /**
-     * Verify that Default Serializer works fine.
-     */
-    @Test
-    public void testDefaultSerDe() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, null);
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> input + "-lambda";
-            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-        } catch (Exception ex) {
-            ex.printStackTrace();
-            assertEquals(ex, null);
-            assertTrue(false);
-        }
-    }
-
-    /**
-     * Verify that Explicit setting of Default Serializer works fine.
-     */
-    @Test
-    public void testExplicitDefaultSerDe() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, DefaultSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> input + "-lambda";
-            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-    /**
-     * Verify that function output type should be consistent with output serde type.
-     */
-    @Test
-    public void testInconsistentOutputType() {
-        try {
-            JavaInstanceRunnable runnable = createRunnable(false, IntegerSerDe.class.getName());
-            Method method = makeAccessible(runnable);
-            ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
-            Function function = (Function<String, String>) (input, context) -> input + "-lambda";
-            Class<?>[] typeArgs = TypeResolver.resolveRawArguments(Function.class, function.getClass());
-            method.invoke(runnable, typeArgs, clsLoader);
-            fail("Should fail constructing java instance if function type is inconsistent with serde type");
-        } catch (InvocationTargetException ex) {
-            assertTrue(ex.getCause().getMessage().startsWith("Inconsistent types found between function output type and output serde type:"));
-        } catch (Exception ex) {
-            assertTrue(false);
-        }
-    }
-
-
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index e6072c8d33..a22b3666ef 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -213,7 +213,7 @@ public void setup() throws Exception {
     @Test
     public void testGetCloseProducer() throws Exception {
         String srcTopic = "test-src-topic";
-        int ptnIdx = 1234;
+        String ptnIdx = "1234";
         Producer<byte[]> producer = producers.getProducer(srcTopic, ptnIdx);
 
         String producerName = makeProducerName(srcTopic, ptnIdx);
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
index 7c58c30985..2ba4e3f835 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
@@ -63,13 +63,13 @@ public void testClose() throws Exception {
     }
 
     @Test
-    public void testWrite() {
+    public void testWrite() throws Exception {
         this.runtimeSink.write("test-record");
         verify(mockSink, times(1)).write(eq("test-record"));
     }
 
     @Test
-    public void testWriteAck() {
+    public void testWriteAck() throws Exception {
         RecordContext context = mock(RecordContext.class);
 
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
@@ -82,7 +82,7 @@ public void testWriteAck() {
     }
 
     @Test
-    public void testWriteFail() {
+    public void testWriteFail() throws Exception {
         RecordContext context = mock(RecordContext.class);
 
         CompletableFuture<Void> writeFuture = new CompletableFuture<>();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
new file mode 100644
index 0000000000..0f826aca55
--- /dev/null
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;;
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.assertTrue;
+import static org.testng.AssertJUnit.fail;
+
+@Slf4j
+public class PulsarSinkTest {
+
+    private static final String TOPIC = "persistent://sample/standalone/ns1/test_result";
+    private static final String serDeClassName = DefaultSerDe.class.getName();
+
+    public static class TestSerDe implements SerDe<String> {
+
+        @Override
+        public String deserialize(byte[] input) {
+            return null;
+        }
+
+        @Override
+        public byte[] serialize(String input) {
+            return new byte[0];
+        }
+    }
+
+    /**
+     * Verify that JavaInstance does not support functions that take Void type as input
+     */
+
+    private static PulsarClient getPulsarClient() throws PulsarClientException {
+        PulsarClient pulsarClient = mock(PulsarClient.class);
+        ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class);
+        doReturn(consumerBuilder).when(consumerBuilder).topics(anyList());
+        doReturn(consumerBuilder).when(consumerBuilder).subscriptionName(anyString());
+        doReturn(consumerBuilder).when(consumerBuilder).subscriptionType(any());
+        doReturn(consumerBuilder).when(consumerBuilder).ackTimeout(anyLong(), any());
+        Consumer consumer = mock(Consumer.class);
+        doReturn(consumer).when(consumerBuilder).subscribe();
+        doReturn(consumerBuilder).when(pulsarClient).newConsumer();
+        return pulsarClient;
+    }
+
+    private static PulsarSinkConfig getPulsarConfigs() {
+        PulsarSinkConfig pulsarConfig = new PulsarSinkConfig();
+        pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER);
+        pulsarConfig.setTopic(TOPIC);
+        pulsarConfig.setSerDeClassName(serDeClassName);
+        pulsarConfig.setTypeClassName(String.class.getName());
+        return pulsarConfig;
+    }
+
+    @Getter
+    @Setter
+    public static class ComplexUserDefinedType {
+        private String name;
+        private Integer age;
+    }
+
+    public static class ComplexSerDe implements SerDe<ComplexUserDefinedType> {
+        @Override
+        public ComplexUserDefinedType deserialize(byte[] input) {
+            return null;
+        }
+
+        @Override
+        public byte[] serialize(ComplexUserDefinedType input) {
+            return new byte[0];
+        }
+    }
+
+    /**
+     * Verify that JavaInstance does support functions that output Void type
+     */
+    @Test
+    public void testVoidOutputClasses() throws Exception {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(Void.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertEquals(ex, null);
+            assertTrue(false);
+        }
+    }
+
+    @Test
+    public void testInconsistentOutputType() throws IOException {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to be inconsistent to that of SerDe
+        pulsarConfig.setTypeClassName(Integer.class.getName());
+        pulsarConfig.setSerDeClassName(TestSerDe.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
+        try {
+            pulsarSink.setupSerDe();
+            fail("Should fail constructing java instance if function type is inconsistent with serde type");
+        } catch (RuntimeException ex) {
+            log.error("RuntimeException: {}", ex, ex);
+            assertTrue(ex.getMessage().startsWith("Inconsistent types found between function output type and output serde type:"));
+        } catch (Exception ex) {
+            log.error("Exception: {}", ex, ex);
+            assertTrue(false);
+        }
+    }
+
+    /**
+     * Verify that Default Serializer works fine.
+     */
+    @Test
+    public void testDefaultSerDe() throws PulsarClientException {
+
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        pulsarConfig.setSerDeClassName(null);
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
+
+    /**
+     * Verify that Explicit setting of Default Serializer works fine.
+     */
+    @Test
+    public void testExplicitDefaultSerDe() throws PulsarClientException {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        pulsarConfig.setSerDeClassName(DefaultSerDe.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
+
+    @Test
+    public void testComplexOuputType() throws PulsarClientException {
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
+        pulsarConfig.setSerDeClassName(ComplexSerDe.class.getName());
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSink.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
+}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 558517ae4b..77d397cacd 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.functions.source;
 
+import lombok.Getter;
+import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -82,8 +84,8 @@ private static PulsarClient getPulsarClient() throws PulsarClientException {
         return pulsarClient;
     }
 
-    private static PulsarConfig getPulsarConfigs() {
-        PulsarConfig pulsarConfig = new PulsarConfig();
+    private static PulsarSourceConfig getPulsarConfigs() {
+        PulsarSourceConfig pulsarConfig = new PulsarSourceConfig();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         pulsarConfig.setSubscriptionType(FunctionConfig.SubscriptionType.FAILOVER);
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
@@ -91,9 +93,29 @@ private static PulsarConfig getPulsarConfigs() {
         return pulsarConfig;
     }
 
+    @Getter
+    @Setter
+    public static class ComplexUserDefinedType {
+        private String name;
+        private Integer age;
+    }
+
+    public static class ComplexSerDe implements SerDe<ComplexUserDefinedType> {
+        @Override
+        public ComplexUserDefinedType deserialize(byte[] input) {
+            return null;
+        }
+
+        @Override
+        public byte[] serialize(ComplexUserDefinedType input) {
+            return new byte[0];
+        }
+    }
+
+
     @Test
     public void testVoidInputClasses() throws IOException {
-        PulsarConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
@@ -115,7 +137,7 @@ public void testVoidInputClasses() throws IOException {
      */
     @Test
     public void testInconsistentInputType() throws IOException {
-        PulsarConfig pulsarConfig = getPulsarConfigs();
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         Map<String, String> topicSerdeClassNameMap = new HashMap<>();
@@ -133,4 +155,64 @@ public void testInconsistentInputType() throws IOException {
             assertTrue(false);
         }
     }
+
+    /**
+     * Verify that Default Serializer works fine.
+     */
+    @Test
+    public void testDefaultSerDe() throws PulsarClientException {
+
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", null);
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSource.open(new HashMap<>());
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertEquals(ex, null);
+            assertTrue(false);
+        }
+    }
+
+    /**
+     * Verify that Explicit setting of Default Serializer works fine.
+     */
+    @Test
+    public void testExplicitDefaultSerDe() throws PulsarClientException {
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(String.class.getName());
+        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result", DefaultSerDe.class.getName());
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSource.open(new HashMap<>());
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            assertEquals(ex, null);
+            assertTrue(false);
+        }
+    }
+
+    @Test
+    public void testComplexOuputType() throws PulsarClientException {
+        PulsarSourceConfig pulsarConfig = getPulsarConfigs();
+        // set type to void
+        pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
+        topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",ComplexSerDe.class.getName());
+        pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig);
+
+        try {
+            pulsarSource.setupSerDe();
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            fail();
+        }
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index e0330bb6fc..5bb5cace7f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -60,13 +60,6 @@
     protected String tenant;
     @Parameter(names = "--namespace", description = "Namespace Name\n", required = true)
     protected String namespace;
-
-    @Parameter(names = "--output_topic", description = "Output Topic Name\n")
-    protected String outputTopicName;
-
-    @Parameter(names = "--output_serde_classname", description = "Output SerDe\n")
-    protected String outputSerdeClassName;
-
     @Parameter(names = "--log_topic", description = "Log Topic")
     protected String logTopic;
 
@@ -112,6 +105,17 @@
     @Parameter(names = "--source_topics_serde_classname", description = "A map of topics to SerDe for the source", required = true)
     protected String sourceTopicsSerdeClassName;
 
+    @Parameter(names = "--sink_configs", description = "The sink configs\n")
+    protected String sinkConfigs;
+
+    @Parameter(names = "--sink_classname", description = "The sink classname\n", required = true)
+    protected String sinkClassname;
+
+    @Parameter(names = "--sink_topic", description = "The sink Topic Name\n", required = true)
+    protected String sinkTopic;
+
+    @Parameter(names = "--sink_serde_classname", description = "Sink SerDe\n")
+    protected String sinkSerdeClassName;
 
     private Server server;
 
@@ -130,15 +134,6 @@ public void start() throws Exception {
         functionDetailsBuilder.setName(functionName);
         functionDetailsBuilder.setClassName(className);
 
-        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-        if (outputSerdeClassName != null) {
-            sinkSpecBuilder.setSerDeClassName(outputSerdeClassName);
-        }
-        if (outputTopicName != null) {
-            sinkSpecBuilder.setTopic(outputTopicName);
-        }
-        functionDetailsBuilder.setSink(sinkSpecBuilder);
-
         if (logTopic != null) {
             functionDetailsBuilder.setLogTopic(logTopic);
         }
@@ -154,6 +149,7 @@ public void start() throws Exception {
             functionDetailsBuilder.putAllUserConfig(userConfigMap);
         }
 
+        // Setup source
         SourceSpec.Builder sourceDetailsBuilder = SourceSpec.newBuilder();
         sourceDetailsBuilder.setClassName(sourceClassname);
         if (sourceConfigs != null && !sourceConfigs.isEmpty()) {;
@@ -165,6 +161,18 @@ public void start() throws Exception {
 
         functionDetailsBuilder.setSource(sourceDetailsBuilder);
 
+        // Setup sink
+        SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+        sinkSpecBuilder.setClassName(sinkClassname);
+        if (sinkConfigs != null) {
+            sinkSpecBuilder.setConfigs(sinkConfigs);
+        }
+        if (sinkSerdeClassName != null) {
+            sinkSpecBuilder.setSerDeClassName(sinkSerdeClassName);
+        }
+        sinkSpecBuilder.setTopic(sinkTopic);
+        functionDetailsBuilder.setSink(sinkSpecBuilder);
+
         FunctionDetails functionDetails = functionDetailsBuilder.build();
         instanceConfig.setFunctionDetails(functionDetails);
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 3aae26f8c3..72a5801752 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -118,16 +118,7 @@
         } else {
             args.add("false");
         }
-        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
-                && !instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
-            args.add("--output_topic");
-            args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
-        }
-        if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() != null
-                && !instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
-            args.add("--output_serde_classname");
-            args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
-        }
+
         args.add("--processing_guarantees");
         args.add(String.valueOf(instanceConfig.getFunctionDetails().getProcessingGuarantees()));
         args.add("--pulsar_serviceurl");
@@ -143,6 +134,7 @@
         args.add("--port");
         args.add(String.valueOf(instancePort));
 
+        // source related configs
         if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
             if (!instanceConfig.getFunctionDetails().getSource().getClassName().isEmpty()) {
                 args.add("--source_classname");
@@ -159,6 +151,29 @@
 
         args.add("--source_topics_serde_classname");
         args.add(new Gson().toJson(instanceConfig.getFunctionDetails().getSource().getTopicsToSerDeClassNameMap()));
+
+        // sink related configs
+        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            if (!instanceConfig.getFunctionDetails().getSink().getClassName().isEmpty()) {
+                args.add("--sink_classname");
+                args.add(instanceConfig.getFunctionDetails().getSink().getClassName());
+            }
+            String sinkConfigs = instanceConfig.getFunctionDetails().getSink().getConfigs();
+            if (sinkConfigs != null && !sinkConfigs.isEmpty()) {
+                args.add("--sink_configs");
+                args.add(sinkConfigs);
+            }
+        }
+        if (instanceConfig.getFunctionDetails().getSink().getTopic() != null
+                && !instanceConfig.getFunctionDetails().getSink().getTopic().isEmpty()) {
+            args.add("--sink_topic");
+            args.add(instanceConfig.getFunctionDetails().getSink().getTopic());
+        }
+        if (instanceConfig.getFunctionDetails().getSink().getSerDeClassName() != null
+                && !instanceConfig.getFunctionDetails().getSink().getSerDeClassName().isEmpty()) {
+            args.add("--sink_serde_classname");
+            args.add(instanceConfig.getFunctionDetails().getSink().getSerDeClassName());
+        }
         return args;
     }
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index d0c6b3ce21..b87e6369d0 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -84,6 +84,7 @@ FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
         functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
                 .setTopic(TEST_NAME + "-output")
                 .setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+                .setClassName("org.pulsar.pulsar.TestSink")
                 .build());
         functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
         functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
@@ -111,7 +112,7 @@ public void testJavaConstructor() {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 45);
+        assertEquals(args.size(), 47);
         String expectedArgs = "java -cp " + javaInstanceJarFile + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
                 + "-Dpulsar.log.dir=" + logDirectory + "/functions" + " -Dpulsar.log.file=" + config.getFunctionDetails().getName()
                 + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
@@ -123,14 +124,15 @@ public void testJavaConstructor() {
                 + " --function_classname " + config.getFunctionDetails().getClassName()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --auto_ack false"
-                + " --output_topic " + config.getFunctionDetails().getSink().getTopic()
-                + " --output_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(38)
+                + " --max_buffered_tuples 1024 --port " + args.get(34)
                 + " --source_classname " + config.getFunctionDetails().getSource().getClassName()
                 + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName);
+                + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName)
+                + " --sink_classname " + config.getFunctionDetails().getSink().getClassName()
+                + " --sink_topic " + config.getFunctionDetails().getSink().getTopic()
+                + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 
@@ -151,13 +153,13 @@ public void testPythonConstructor() {
                 + " --function_classname " + config.getFunctionDetails().getClassName()
                 + " --log_topic " + config.getFunctionDetails().getLogTopic()
                 + " --auto_ack false"
-                + " --output_topic " + config.getFunctionDetails().getSink().getTopic()
-                + " --output_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
-                + " --max_buffered_tuples 1024 --port " + args.get(37)
+                + " --max_buffered_tuples 1024 --port " + args.get(33)
                 + " --source_subscription_type " + config.getFunctionDetails().getSource().getSubscriptionType().name()
-                + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName);
+                + " --source_topics_serde_classname " + new Gson().toJson(topicsToSerDeClassName)
+                + " --sink_topic " + config.getFunctionDetails().getSink().getTopic()
+                + " --sink_serde_classname " + config.getFunctionDetails().getSink().getSerDeClassName();
         assertEquals(expectedArgs, String.join(" ", args));
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services