You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/09/26 17:44:52 UTC

[GitHub] sijie closed pull request #2605: implement topic routing on a per record basis

sijie closed pull request #2605: implement topic routing on a per record basis
URL: https://github.com/apache/pulsar/pull/2605
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
index 59cc104b05..5840278479 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
@@ -93,4 +93,13 @@ default void ack() {
      */
     default void fail() {
     }
+
+    /**
+     * To support message routing on a per message basis
+     *
+     * @return The topic this message should be written to
+     */
+    default Optional<String> getDestinationTopic() {
+        return Optional.empty();
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 32e878db05..9af68f0d23 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -19,25 +19,13 @@
 
 package org.apache.pulsar.functions.instance;
 
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
-
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-
 import io.netty.buffer.ByteBuf;
-
-import java.io.FileNotFoundException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import net.jodah.typetools.TypeResolver;
 import org.apache.bookkeeper.api.StorageClient;
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
@@ -51,7 +39,6 @@
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -65,7 +52,6 @@
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
-import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
 import org.apache.pulsar.functions.utils.ConsumerConfig;
@@ -78,7 +64,15 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import net.jodah.typetools.TypeResolver;
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
+import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
 
 /**
  * A function container implemented using java thread.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index 05c2114edc..a8180204e8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -72,4 +72,8 @@ public void fail() {
         sourceRecord.fail();
     }
 
+    @Override
+    public Optional<String> getDestinationTopic() {
+        return sourceRecord.getDestinationTopic();
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
deleted file mode 100644
index 8160a1a190..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import java.util.concurrent.TimeUnit;
-
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.HashingScheme;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.functions.instance.FunctionResultRouter;
-
-public abstract class AbstractOneOuputTopicProducers<T> implements Producers<T> {
-
-    protected final PulsarClient client;
-    protected final String outputTopic;
-
-    AbstractOneOuputTopicProducers(PulsarClient client,
-                                   String outputTopic)
-            throws PulsarClientException {
-        this.client = client;
-        this.outputTopic = outputTopic;
-    }
-
-    static <T> ProducerBuilder<T> newProducerBuilder(PulsarClient client, Schema<T> schema) {
-        // use function result router to deal with different processing guarantees.
-        return client.newProducer(schema) //
-                .blockIfQueueFull(true) //
-                .enableBatching(true) //
-                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS) //
-                .compressionType(CompressionType.LZ4) //
-                .hashingScheme(HashingScheme.Murmur3_32Hash) //
-                .messageRoutingMode(MessageRoutingMode.CustomPartition) //
-                .messageRouter(FunctionResultRouter.of());
-    }
-
-    protected Producer<T> createProducerWithProducerName(String topic, String producerName, Schema<T> schema, String fqfn)
-            throws PulsarClientException {
-        return createProducer(client, topic, producerName, schema, fqfn);
-    }
-
-    public static <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn)
-            throws PulsarClientException {
-        ProducerBuilder<T> builder = newProducerBuilder(client, schema).topic(topic);
-        if (producerName != null) {
-            builder.producerName(producerName);
-        }
-
-        return builder
-                .property("application", "pulsarfunction")
-                .property("fqfn", fqfn).create();
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
deleted file mode 100644
index 3994dd1436..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-
-import lombok.AccessLevel;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-
-@Slf4j
-public class MultiConsumersOneOuputTopicProducers<T> extends AbstractOneOuputTopicProducers<T> {
-
-    @Getter(AccessLevel.PACKAGE)
-    // PartitionId -> producer
-    private final Map<String, Producer<T>> producers;
-
-    private final Schema<T> schema;
-    private final String fqfn;
-
-
-    public MultiConsumersOneOuputTopicProducers(PulsarClient client,
-                                                String outputTopic, Schema<T> schema,
-                                                String fqfn)
-            throws PulsarClientException {
-        super(client, outputTopic);
-        this.producers = new ConcurrentHashMap<>();
-        this.schema = schema;
-        this.fqfn = fqfn;
-    }
-
-    @Override
-    public void initialize() throws PulsarClientException {
-        // no-op
-    }
-
-    static String makeProducerName(String srcTopicName, String srcTopicPartition) {
-        return String.format("%s-%s", srcTopicName, srcTopicPartition);
-    }
-
-    @Override
-    public synchronized Producer<T> getProducer(String srcPartitionId) throws PulsarClientException {
-        Producer<T> producer = producers.get(srcPartitionId);
-        if (null == producer) {
-            producer = createProducerWithProducerName(outputTopic, srcPartitionId, schema, fqfn);
-            producers.put(srcPartitionId, producer);
-        }
-        return producer;
-    }
-
-    @Override
-    public synchronized void closeProducer(String srcPartitionId) {
-        Producer<T> producer = producers.get(srcPartitionId);
-        if (null != producer) {
-            producer.closeAsync();
-            producers.remove(srcPartitionId);
-        }
-    }
-
-    @Override
-    public synchronized void close() {
-        List<CompletableFuture<Void>> closeFutures = new ArrayList<>(producers.size());
-        for (Producer<T> producer: producers.values()) {
-            closeFutures.add(producer.closeAsync());
-        }
-        try {
-            FutureUtils.result(FutureUtils.collect(closeFutures));
-        } catch (Exception e) {
-            log.warn("Fail to close all the producers for output topic {}", outputTopic, e);
-        }
-    }
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
deleted file mode 100644
index 7892876c1b..0000000000
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/Producers.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClientException;
-
-/**
- * An interface for managing publishers within a java instance.
- */
-public interface Producers<T> extends AutoCloseable {
-
-    /**
-     * Initialize all the producers.
-     *
-     * @throws PulsarClientException
-     */
-    void initialize() throws PulsarClientException;
-
-    /**
-     * Get the producer specified by <tt>srcPartitionId</tt>.
-     *
-     * @param srcPartitionId
-     *          src partition Id
-     * @return the producer instance to produce messages
-     */
-    Producer<T> getProducer(String srcPartitionId) throws PulsarClientException;
-
-    /**
-     * Close a producer specified by <tt>srcPartitionId</tt>.
-     *
-     * @param srcPartitionId src partition id
-     */
-    void closeProducer(String srcPartitionId);
-
-
-        @Override
-    void close();
-}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 44490454c3..024638b6b9 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -19,28 +19,20 @@
 package org.apache.pulsar.functions.sink;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import java.util.Base64;
-import java.util.Map;
-import java.util.Optional;
-
-import lombok.AccessLevel;
-import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.FunctionResultRouter;
 import org.apache.pulsar.functions.instance.SinkRecord;
-import org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
-import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
-import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -48,152 +40,170 @@
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
 @Slf4j
 public class PulsarSink<T> implements Sink<T> {
 
     private final PulsarClient client;
     private final PulsarSinkConfig pulsarSinkConfig;
 
-    private PulsarSinkProcessor<T> pulsarSinkProcessor;
+    @VisibleForTesting
+    PulsarSinkProcessor<T> pulsarSinkProcessor;
 
     private final TopicSchema topicSchema;
     private final String fqfn;
 
     private interface PulsarSinkProcessor<T> {
-        void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception;
 
         TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception;
 
         void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception;
 
-        abstract void close() throws Exception;
+        void close() throws Exception;
     }
 
-    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor<T> {
-        private Producer<T> producer;
+    private abstract class PulsarSinkProcessorBase implements PulsarSinkProcessor<T> {
+        protected Map<String, Producer<T>> publishProducers = new ConcurrentHashMap<>();
+        protected Schema schema;
 
-        @Override
-        public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception {
-            this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+        protected PulsarSinkProcessorBase(Schema schema) {
+            this.schema = schema;
         }
 
-        @Override
-        public TypedMessageBuilder<T> newMessage(Record<T> record) {
-            return producer.newMessage();
+        public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn)
+                throws PulsarClientException {
+            ProducerBuilder<T> builder = client.newProducer(schema)
+                    .blockIfQueueFull(true)
+                    .enableBatching(true)
+                    .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS)
+                    .compressionType(CompressionType.LZ4)
+                    .hashingScheme(HashingScheme.Murmur3_32Hash) //
+                    .messageRoutingMode(MessageRoutingMode.CustomPartition)
+                    .messageRouter(FunctionResultRouter.of())
+                    .topic(topic);
+            if (producerName != null) {
+                builder.producerName(producerName);
+            }
+
+            return builder
+                    .property("application", "pulsarfunction")
+                    .property("fqfn", fqfn).create();
         }
 
-        @Override
-        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
-            msg.sendAsync();
+        protected Producer<T> getProducer(String destinationTopic) {
+            return getProducer(destinationTopic, null, destinationTopic);
         }
 
-        @Override
-        public void close() throws Exception {
-            if (null != producer) {
+        protected Producer<T> getProducer(String producerId, String producerName, String topicName) {
+            return publishProducers.computeIfAbsent(producerId, s -> {
                 try {
-                    producer.close();
+                    return createProducer(
+                            client,
+                            topicName,
+                            producerName,
+                            schema,
+                            fqfn);
                 } catch (PulsarClientException e) {
-                    log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e);
+                    log.error("Failed to create Producer while doing user publish", e);
+                    throw new RuntimeException(e);
                 }
+            });
+        }
+
+        @Override
+        public void close() throws Exception {
+            List<CompletableFuture<Void>> closeFutures = new ArrayList<>(publishProducers.size());
+            for (Map.Entry<String, Producer<T>> entry: publishProducers.entrySet()) {
+                Producer<T> producer = entry.getValue();
+                closeFutures.add(producer.closeAsync());
+            }
+            try {
+                org.apache.pulsar.common.util.FutureUtil.waitForAll(closeFutures);
+            } catch (Exception e) {
+                log.warn("Failed to close all the producers", e);
             }
         }
     }
 
-    private class PulsarSinkAtLeastOnceProcessor implements PulsarSinkProcessor<T> {
-        private Producer<T> producer;
-
-        @Override
-        public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception {
-            this.producer = AbstractOneOuputTopicProducers.createProducer(
-                    client, pulsarSinkConfig.getTopic(), null, schema, fqfn);
+    @VisibleForTesting
+    class PulsarSinkAtMostOnceProcessor extends PulsarSinkProcessorBase {
+        public PulsarSinkAtMostOnceProcessor(Schema schema) {
+            super(schema);
+            // initialize default topic
+            try {
+                publishProducers.put(pulsarSinkConfig.getTopic(),
+                        createProducer(client, pulsarSinkConfig.getTopic(), null, schema, fqfn));
+            } catch (PulsarClientException e) {
+                log.error("Failed to create Producer while doing user publish", e);
+                throw new RuntimeException(e);            }
         }
 
         @Override
         public TypedMessageBuilder<T> newMessage(Record<T> record) {
-            return producer.newMessage();
+            return getProducer(record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())).newMessage();
         }
 
         @Override
         public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
-            msg.sendAsync().thenAccept(messageId -> record.ack());
+            msg.sendAsync();
+        }
+    }
+
+    @VisibleForTesting
+    class PulsarSinkAtLeastOnceProcessor extends PulsarSinkAtMostOnceProcessor {
+        public PulsarSinkAtLeastOnceProcessor(Schema schema) {
+            super(schema);
         }
 
         @Override
-        public void close() throws Exception {
-            if (null != producer) {
-                try {
-                    producer.close();
-                } catch (PulsarClientException e) {
-                    log.warn("Fail to close producer for processor {}", pulsarSinkConfig.getTopic(), e);
-                }
-            }
+        public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record) throws Exception {
+            msg.sendAsync().thenAccept(messageId -> record.ack());
         }
     }
 
-    private class PulsarSinkEffectivelyOnceProcessor implements PulsarSinkProcessor<T>, ConsumerEventListener {
+    @VisibleForTesting
+    class PulsarSinkEffectivelyOnceProcessor extends PulsarSinkProcessorBase {
 
-        @Getter(AccessLevel.PACKAGE)
-        protected Producers<T> outputProducer;
 
-        @Override
-        public void initializeOutputProducer(String outputTopic, Schema<T> schema, String fqfn) throws Exception {
-            outputProducer = new MultiConsumersOneOuputTopicProducers<T>(client, outputTopic, schema, fqfn);
-            outputProducer.initialize();
+        public PulsarSinkEffectivelyOnceProcessor(Schema schema) {
+            super(schema);
         }
 
         @Override
         public TypedMessageBuilder<T> newMessage(Record<T> record) throws Exception {
-            // Route message to appropriate partition producer
-            return outputProducer.getProducer(record.getPartitionId().get()).newMessage();
+            if (!record.getPartitionId().isPresent()) {
+                throw new RuntimeException("PartitionId needs to be specified for every record while in Effectively-once mode");
+            }
+
+            return getProducer(
+                    String.format("%s-%s",record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic()), record.getPartitionId().get()),
+                    record.getPartitionId().get(),
+                    record.getDestinationTopic().orElse(pulsarSinkConfig.getTopic())
+            ).newMessage();
         }
 
         @Override
         public void sendOutputMessage(TypedMessageBuilder<T> msg, Record<T> record)
                 throws Exception {
 
-            // assign sequence id to output message for idempotent producing
-            if (record.getRecordSequence().isPresent()) {
-                msg.sequenceId(record.getRecordSequence().get());
+            if (!record.getRecordSequence().isPresent()) {
+                throw new RuntimeException("RecordSequence needs to be specified for every record while in Effectively-once mode");
             }
 
+            // assign sequence id to output message for idempotent producing
+            msg.sequenceId(record.getRecordSequence().get());
             msg.sendAsync()
                     .thenAccept(messageId -> record.ack())
                     .join();
         }
-
-        @Override
-        public void close() throws Exception {
-            // kill the result producer
-            if (null != outputProducer) {
-                outputProducer.close();
-                outputProducer = null;
-            }
-        }
-
-        @Override
-        public void becameActive(Consumer<?> consumer, int partitionId) {
-            // if the instance becomes active for a given topic partition,
-            // open a producer for the results computed from this topic partition.
-            if (null != outputProducer) {
-                try {
-                    this.outputProducer.getProducer(String.format("%s-%d", consumer.getTopic(), partitionId));
-                } catch (PulsarClientException e) {
-                    // this can be ignored, because producer can be lazily created when accessing it.
-                    log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
-                            consumer.getTopic(), partitionId);
-                }
-            }
-        }
-
-        @Override
-        public void becameInactive(Consumer<?> consumer, int partitionId) {
-            if (null != outputProducer) {
-                // if I lost the ownership of a partition, close its corresponding topic partition.
-                // this is to allow the new active consumer be able to produce to the result topic.
-                this.outputProducer.closeProducer(String.format("%s-%d", consumer.getTopic(), partitionId));
-            }
-        }
     }
 
     public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) {
@@ -212,16 +222,15 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc
         FunctionConfig.ProcessingGuarantees processingGuarantees = this.pulsarSinkConfig.getProcessingGuarantees();
         switch (processingGuarantees) {
             case ATMOST_ONCE:
-                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor();
+                this.pulsarSinkProcessor = new PulsarSinkAtMostOnceProcessor(schema);
                 break;
             case ATLEAST_ONCE:
-                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor();
+                this.pulsarSinkProcessor = new PulsarSinkAtLeastOnceProcessor(schema);
                 break;
             case EFFECTIVELY_ONCE:
-                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor();
+                this.pulsarSinkProcessor = new PulsarSinkEffectivelyOnceProcessor(schema);
                 break;
         }
-        this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic(), schema, fqfn);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
deleted file mode 100644
index 399a4c701b..0000000000
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.instance.producers;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.any;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertSame;
-import static org.testng.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.bookkeeper.common.concurrent.FutureUtils;
-import org.apache.pulsar.client.api.CompressionType;
-import org.apache.pulsar.client.api.CryptoKeyReader;
-import org.apache.pulsar.client.api.HashingScheme;
-import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.MessageRoutingMode;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
-import org.apache.pulsar.client.api.ProducerInterceptor;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test of {@link MultiConsumersOneOuputTopicProducers}.
- */
-public class MultiConsumersOneOutputTopicProducersTest {
-
-    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
-
-    private PulsarClient mockClient;
-    private final Map<String, Producer<byte[]>> mockProducers = new HashMap<>();
-    private MultiConsumersOneOuputTopicProducers<byte[]> producers;
-
-    private class MockProducerBuilder implements ProducerBuilder<byte[]> {
-
-        String producerName = "";
-
-        @Override
-        public Producer<byte[]> create() throws PulsarClientException {
-            Producer<byte[]> producer;
-            synchronized (mockProducers) {
-                producer = mockProducers.get(producerName);
-                if (null == producer) {
-                    producer = createMockProducer(producerName);
-                    mockProducers.put(producerName, producer);
-                }
-            }
-            return producer;
-        }
-
-        @Override
-        public CompletableFuture<Producer<byte[]>> createAsync() {
-            try {
-                return CompletableFuture.completedFuture(create());
-            } catch (PulsarClientException e) {
-                CompletableFuture<Producer<byte[]>> future = new CompletableFuture<>();
-                future.completeExceptionally(e);
-                return future;
-            }
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> loadConf(Map<String, Object> config) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> clone() {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> topic(String topicName) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> producerName(String producerName) {
-            this.producerName = producerName;
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> sendTimeout(int sendTimeout, TimeUnit unit) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> maxPendingMessages(int maxPendingMessages) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> maxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> blockIfQueueFull(boolean blockIfQueueFull) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> messageRoutingMode(MessageRoutingMode messageRoutingMode) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> hashingScheme(HashingScheme hashingScheme) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> compressionType(CompressionType compressionType) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> messageRouter(MessageRouter messageRouter) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> enableBatching(boolean enableBatching) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> cryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> addEncryptionKey(String key) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> cryptoFailureAction(ProducerCryptoFailureAction action) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> batchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> initialSequenceId(long initialSequenceId) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> property(String key, String value) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> properties(Map<String, String> properties) {
-            return this;
-        }
-
-        @Override
-        public ProducerBuilder<byte[]> intercept(ProducerInterceptor<byte[]>... interceptors) {
-            return null;
-        }
-    }
-
-    @BeforeMethod
-    public void setup() throws Exception {
-        this.mockClient = mock(PulsarClient.class);
-
-        when(mockClient.newProducer(any(Schema.class)))
-                .thenReturn(new MockProducerBuilder());
-
-        producers = new MultiConsumersOneOuputTopicProducers<byte[]>(mockClient, TEST_OUTPUT_TOPIC, Schema.BYTES, "test");
-        producers.initialize();
-    }
-
-    private Producer<byte[]> createMockProducer(String topic) {
-        Producer<byte[]> producer = mock(Producer.class);
-        when(producer.closeAsync())
-                .thenAnswer(invocationOnMock -> {
-                    synchronized (mockProducers) {
-                        mockProducers.remove(topic);
-                    }
-                    return FutureUtils.Void();
-                });
-        return producer;
-    }
-
-    @Test
-    public void testGetCloseProducer() throws Exception {
-        String srcTopic = "test-src-topic";
-        String ptnIdx = "1234";
-        String producerName = String.format("%s-%s", srcTopic, ptnIdx);
-        Producer<byte[]> producer = producers.getProducer(producerName);
-
-        assertSame(mockProducers.get(producerName), producer);
-        verify(mockClient, times(1))
-                .newProducer(Schema.BYTES);
-        assertTrue(producers.getProducers().containsKey(producerName));
-
-        // second get will not create a new producer
-        assertSame(mockProducers.get(producerName), producer);
-        verify(mockClient, times(1))
-                .newProducer(Schema.BYTES);
-        assertTrue(producers.getProducers().containsKey(producerName));
-
-        // close
-        producers.closeProducer(producerName);
-        verify(producer, times(1)).closeAsync();
-        assertFalse(producers.getProducers().containsKey(srcTopic));
-    }
-
-}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 72e3c56a1c..4722c6ff6d 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -19,30 +19,49 @@
 package org.apache.pulsar.functions.sink;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyList;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import com.google.common.collect.ImmutableMap;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.io.core.SinkContext;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.mockito.ArgumentMatcher;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Slf4j
@@ -78,9 +97,39 @@ private static PulsarClientImpl getPulsarClient() throws PulsarClientException {
         doReturn(consumer).when(consumerBuilder).subscribe();
         doReturn(consumerBuilder).when(pulsarClient).newConsumer(any());
         doReturn(CompletableFuture.completedFuture(Optional.empty())).when(pulsarClient).getSchema(anyString());
+
+        ProducerBuilder producerBuilder = mock(ProducerBuilder.class);
+        doReturn(producerBuilder).when(producerBuilder).blockIfQueueFull(anyBoolean());
+        doReturn(producerBuilder).when(producerBuilder).enableBatching(anyBoolean());
+        doReturn(producerBuilder).when(producerBuilder).batchingMaxPublishDelay(anyLong(), any());
+        doReturn(producerBuilder).when(producerBuilder).compressionType(any());
+        doReturn(producerBuilder).when(producerBuilder).hashingScheme(any());
+        doReturn(producerBuilder).when(producerBuilder).messageRoutingMode(any());
+        doReturn(producerBuilder).when(producerBuilder).messageRouter(any());
+        doReturn(producerBuilder).when(producerBuilder).topic(anyString());
+        doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
+        doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString());
+
+        CompletableFuture completableFuture = new CompletableFuture<>();
+        completableFuture.complete(mock(MessageId.class));
+        TypedMessageBuilder typedMessageBuilder = mock(TypedMessageBuilder.class);
+        doReturn(completableFuture).when(typedMessageBuilder).sendAsync();
+
+        Producer producer = mock(Producer.class);
+        doReturn(producer).when(producerBuilder).create();
+        doReturn(typedMessageBuilder).when(producer).newMessage();
+
+        doReturn(producerBuilder).when(pulsarClient).newProducer();
+        doReturn(producerBuilder).when(pulsarClient).newProducer(any());
+
         return pulsarClient;
     }
 
+    @BeforeMethod
+    public void setup() {
+
+    }
+
     private static PulsarSinkConfig getPulsarConfigs() {
         PulsarSinkConfig pulsarConfig = new PulsarSinkConfig();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
@@ -200,4 +249,215 @@ public void testComplexOuputType() throws PulsarClientException {
             fail();
         }
     }
+
+
+
+    @Test
+    public void testSinkAndMessageRouting() throws Exception {
+
+        String[] topics = {"topic-1", "topic-2", "topic-3", null};
+        String defaultTopic = "default";
+        PulsarSinkConfig pulsarConfig = getPulsarConfigs();
+        pulsarConfig.setTopic(defaultTopic);
+        PulsarClient pulsarClient;
+
+        /** test At-least-once **/
+        pulsarClient = getPulsarClient();
+        pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+
+        pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+        for (String topic : topics) {
+
+            SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+                @Override
+                public Optional<String> getKey() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public String getValue() {
+                    return "in1";
+                }
+
+                @Override
+                public Optional<String> getDestinationTopic() {
+                    if (topic != null) {
+                        return Optional.of(topic);
+                    } else {
+                        return Optional.empty();
+                    }
+                }
+            }, "out1");
+
+
+            pulsarSink.write(record);
+
+            Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtLeastOnceProcessor);
+            PulsarSink.PulsarSinkAtLeastOnceProcessor pulsarSinkAtLeastOnceProcessor
+                    = (PulsarSink.PulsarSinkAtLeastOnceProcessor) pulsarSink.pulsarSinkProcessor;
+            if (topic != null) {
+                Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(topic));
+            } else {
+                Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
+            }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
+
+                @Override
+                public boolean matches(Object o) {
+                    if (o instanceof String) {
+                        if (topic != null) {
+                            return topic.equals(o);
+                        } else {
+                            return defaultTopic.equals(o);
+                        }
+                    }
+                    return false;
+                }
+            }));
+        }
+
+        /** test At-most-once **/
+        pulsarClient = getPulsarClient();
+        pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+
+        pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+        for (String topic : topics) {
+
+            SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+                @Override
+                public Optional<String> getKey() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public String getValue() {
+                    return "in1";
+                }
+
+                @Override
+                public Optional<String> getDestinationTopic() {
+                    if (topic != null) {
+                        return Optional.of(topic);
+                    } else {
+                        return Optional.empty();
+                    }
+                }
+            }, "out1");
+
+
+            pulsarSink.write(record);
+
+            Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkAtMostOnceProcessor);
+            PulsarSink.PulsarSinkAtMostOnceProcessor pulsarSinkAtLeastOnceProcessor
+                    = (PulsarSink.PulsarSinkAtMostOnceProcessor) pulsarSink.pulsarSinkProcessor;
+            if (topic != null) {
+                Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(topic));
+            } else {
+                Assert.assertTrue(pulsarSinkAtLeastOnceProcessor.publishProducers.containsKey(defaultTopic));
+            }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
+
+                @Override
+                public boolean matches(Object o) {
+                    if (o instanceof String) {
+                        if (topic != null) {
+                            return topic.equals(o);
+                        } else {
+                            return defaultTopic.equals(o);
+                        }
+                    }
+                    return false;
+                }
+            }));
+        }
+
+        /** test Effectively-once **/
+        pulsarClient = getPulsarClient();
+        pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+
+        pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
+
+        for (String topic : topics) {
+
+            SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+                @Override
+                public Optional<String> getKey() {
+                    return Optional.empty();
+                }
+
+                @Override
+                public String getValue() {
+                    return "in1";
+                }
+
+                @Override
+                public Optional<String> getDestinationTopic() {
+                    if (topic != null) {
+                        return Optional.of(topic);
+                    } else {
+                        return Optional.empty();
+                    }
+                }
+                @Override
+                public Optional<String> getPartitionId() {
+                    if (topic != null) {
+                        return Optional.of(topic + "-id-1");
+                    } else {
+                        return Optional.of(defaultTopic + "-id-1");
+                    }
+                }
+
+                @Override
+                public Optional<Long> getRecordSequence() {
+                    return Optional.of(1L);
+                }
+            }, "out1");
+
+
+            pulsarSink.write(record);
+
+            Assert.assertTrue(pulsarSink.pulsarSinkProcessor instanceof PulsarSink.PulsarSinkEffectivelyOnceProcessor);
+            PulsarSink.PulsarSinkEffectivelyOnceProcessor pulsarSinkEffectivelyOnceProcessor
+                    = (PulsarSink.PulsarSinkEffectivelyOnceProcessor) pulsarSink.pulsarSinkProcessor;
+            if (topic != null) {
+                Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers.containsKey(String.format("%s-%s-id-1", topic, topic)));
+            } else {
+                Assert.assertTrue(pulsarSinkEffectivelyOnceProcessor.publishProducers.containsKey(String.format("%s-%s-id-1", defaultTopic, defaultTopic)));
+            }
+            verify(pulsarClient.newProducer(), times(1)).topic(argThat(new ArgumentMatcher<String>() {
+
+                @Override
+                public boolean matches(Object o) {
+                    if (o instanceof String) {
+                        if (topic != null) {
+                            return topic.equals(o);
+                        } else {
+                            return defaultTopic.equals(o);
+                        }
+                    }
+                    return false;
+                }
+            }));
+            verify(pulsarClient.newProducer(), times(1)).producerName(argThat(new ArgumentMatcher<String>() {
+
+                @Override
+                public boolean matches(Object o) {
+                    if (o instanceof String) {
+                        if (topic != null) {
+                            return String.format("%s-id-1", topic).equals(o);
+                        } else {
+                            return String.format("%s-id-1", defaultTopic).equals(o);
+                        }
+                    }
+                    return false;
+                }
+            }));
+        }
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services