You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/01/07 22:01:50 UTC

[pulsar] branch master updated: Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b239ec  Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315)
7b239ec is described below

commit 7b239ec838f36ef2d61a817f858d0b4711b6c219
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon Jan 7 14:01:46 2019 -0800

    Clean up and correct properties to producer and consumers created by Functions/Sinks/Sources (#3315)
    
    * clean up and correct properties to producer and consumers created by functions
    
    * fix test
    
    * cleaning up comment
    
    * refactoring
---
 .../pulsar/functions/instance/ContextImpl.java     | 13 +++++++++++-
 .../pulsar/functions/instance/InstanceUtils.java   | 22 ++++++++++++++++++++
 .../functions/instance/JavaInstanceRunnable.java   | 20 ++++++++++++------
 .../apache/pulsar/functions/sink/PulsarSink.java   | 19 ++++++++---------
 .../pulsar/functions/source/PulsarSource.java      | 13 +++++-------
 .../instance/src/main/python/contextimpl.py        |  7 ++++++-
 .../instance/src/main/python/python_instance.py    | 24 ++++++++++++++++++----
 pulsar-functions/instance/src/main/python/util.py  |  6 ++++++
 .../pulsar/functions/sink/PulsarSinkTest.java      | 18 ++++++++--------
 .../pulsar/functions/source/PulsarSourceTest.java  | 11 +++++-----
 10 files changed, 110 insertions(+), 43 deletions(-)

diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 460ba75..97956b8 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
@@ -95,6 +96,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         userMetricsLabelNames = Arrays.copyOf(ComponentStatsManager.metricsLabelNames, ComponentStatsManager.metricsLabelNames.length + 1);
         userMetricsLabelNames[ComponentStatsManager.metricsLabelNames.length] = "metric";
     }
+    private final Utils.ComponentType componentType;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
                        SecretsProvider secretsProvider, CollectorRegistry collectorRegistry, String[] metricsLabels,
@@ -148,6 +150,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
                 .quantile(0.99, 0.01)
                 .quantile(0.999, 0.01)
                 .register(collectorRegistry);
+        this.componentType = componentType;
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -307,7 +310,15 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         if (producer == null) {
             try {
                 Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
-                        .schema(schema).topic(topicName).create();
+                        .schema(schema)
+                        .topic(topicName)
+                        .properties(InstanceUtils.getProperties(componentType,
+                                FunctionDetailsUtils.getFullyQualifiedName(
+                                        this.config.getFunctionDetails().getTenant(),
+                                        this.config.getFunctionDetails().getNamespace(),
+                                        this.config.getFunctionDetails().getName()),
+                                this.config.getInstanceId()))
+                        .create();
 
                 Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 9e32736..88a9df3 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -36,6 +36,9 @@ import org.apache.pulsar.functions.utils.Reflections;
 import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.functions.utils.Utils;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @UtilityClass
 public class InstanceUtils {
     public static SerDe<?> initializeSerDe(String serdeClassName, ClassLoader clsLoader, Class<?> typeArg,
@@ -103,4 +106,23 @@ public class InstanceUtils {
         }
         return SINK;
     }
+
+    public static Map<String, String> getProperties(Utils.ComponentType componentType,
+                                                    String fullyQualifiedName, int instanceId) {
+        Map<String, String> properties = new HashMap<>();
+        switch (componentType) {
+            case FUNCTION:
+                properties.put("application", "pulsar-function");
+                break;
+            case SOURCE:
+                properties.put("application", "pulsar-source");
+                break;
+            case SINK:
+                properties.put("application", "pulsar-sink");
+                break;
+        }
+        properties.put("id", fullyQualifiedName);
+        properties.put("instance_id", String.valueOf(instanceId));
+        return properties;
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 7c36b58..fde9628 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -132,6 +132,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     private InstanceCache instanceCache;
 
+    private final Utils.ComponentType componentType;
+
+    private final Map<String, String> properties;
+
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
@@ -156,6 +160,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())
         };
 
+        this.componentType = InstanceUtils.calculateSubjectType(instanceConfig.getFunctionDetails());
+
+        this.properties = InstanceUtils.getProperties(this.componentType,
+                FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()),
+                this.instanceConfig.getInstanceId());
+
         // Declare function local collector registry so that it will not clash with other function instances'
         // metrics collection especially in threaded mode
         // In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
@@ -205,7 +215,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
         return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider,
-                collectorRegistry, metricsLabels, InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
+                collectorRegistry, metricsLabels, this.componentType);
     }
 
     /**
@@ -221,7 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
             }
             this.stats = ComponentStatsManager.getStatsManager(this.collectorRegistry, this.metricsLabels,
                     this.instanceCache.getScheduledExecutorService(),
-                    InstanceUtils.calculateSubjectType(this.instanceConfig.getFunctionDetails()));
+                    this.componentType);
 
             ContextImpl contextImpl = setupContext();
             javaInstance = setupJavaInstance(contextImpl);
@@ -648,8 +658,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                 pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
                 pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
             }
-            object = new PulsarSource(this.client, pulsarSourceConfig,
-                    FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+            object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
         } else {
             object = Reflections.createInstance(
                     sourceSpec.getClassName(),
@@ -695,8 +704,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
                 pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
 
-                object = new PulsarSink(this.client, pulsarSinkConfig,
-                        FunctionDetailsUtils.getFullyQualifiedName(this.instanceConfig.getFunctionDetails()));
+                object = new PulsarSink(this.client, pulsarSinkConfig, this.properties);
             }
         } else {
             object = Reflections.createInstance(
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 1c61b45..3195a82 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -32,11 +32,13 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
@@ -54,12 +56,12 @@ public class PulsarSink<T> implements Sink<T> {
 
     private final PulsarClient client;
     private final PulsarSinkConfig pulsarSinkConfig;
+    private final Map<String, String> properties;
 
     @VisibleForTesting
     PulsarSinkProcessor<T> pulsarSinkProcessor;
 
     private final TopicSchema topicSchema;
-    private final String fqfn;
 
     private interface PulsarSinkProcessor<T> {
 
@@ -78,7 +80,7 @@ public class PulsarSink<T> implements Sink<T> {
             this.schema = schema;
         }
 
-        public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema, String fqfn)
+        public <T> Producer<T> createProducer(PulsarClient client, String topic, String producerName, Schema<T> schema)
                 throws PulsarClientException {
             ProducerBuilder<T> builder = client.newProducer(schema)
                     .blockIfQueueFull(true)
@@ -96,9 +98,7 @@ public class PulsarSink<T> implements Sink<T> {
                 builder.producerName(producerName);
             }
 
-            return builder
-                    .property("application", "pulsarfunction")
-                    .property("fqfn", fqfn).create();
+            return builder.properties(properties).create();
         }
 
         protected Producer<T> getProducer(String destinationTopic) {
@@ -112,8 +112,7 @@ public class PulsarSink<T> implements Sink<T> {
                             client,
                             topicName,
                             producerName,
-                            schema,
-                            fqfn);
+                            schema);
                 } catch (PulsarClientException e) {
                     log.error("Failed to create Producer while doing user publish", e);
                     throw new RuntimeException(e);
@@ -143,7 +142,7 @@ public class PulsarSink<T> implements Sink<T> {
             // initialize default topic
             try {
                 publishProducers.put(pulsarSinkConfig.getTopic(),
-                        createProducer(client, pulsarSinkConfig.getTopic(), null, schema, fqfn));
+                        createProducer(client, pulsarSinkConfig.getTopic(), null, schema));
             } catch (PulsarClientException e) {
                 log.error("Failed to create Producer while doing user publish", e);
                 throw new RuntimeException(e);            }
@@ -209,11 +208,11 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, String fqfn) {
+    public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties) {
         this.client = client;
         this.pulsarSinkConfig = pulsarSinkConfig;
         this.topicSchema = new TopicSchema(client);
-        this.fqfn = fqfn;
+        this.properties = properties;
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index ff41dc8..869c706 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -36,7 +36,9 @@ import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.client.impl.TopicMessageImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 
@@ -45,17 +47,16 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
 
     private final PulsarClient pulsarClient;
     private final PulsarSourceConfig pulsarSourceConfig;
+    private final Map<String, String> properties;
     private List<String> inputTopics;
     private List<Consumer<T>> inputConsumers;
     private final TopicSchema topicSchema;
-    private final String fqfn;
 
-    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig,
-                        String fqfn) {
+    public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
         this.pulsarClient = pulsarClient;
         this.pulsarSourceConfig = pulsarConfig;
         this.topicSchema = new TopicSchema(pulsarClient);
-        this.fqfn = fqfn;
+        this.properties = properties;
     }
 
     @Override
@@ -64,10 +65,6 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
         log.info("Opening pulsar source with config: {}", pulsarSourceConfig);
         Map<String, ConsumerConfig<T>> configs = setupConsumerConfigs();
 
-        Map<String, String> properties = new HashMap<>();
-        properties.put("application", "pulsarfunction");
-        properties.put("fqfn", fqfn);
-
         inputConsumers = configs.entrySet().stream().map(e -> {
             String topic = e.getKey();
             ConsumerConfig<T> conf = e.getValue();
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 63332d0..6b56163 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -147,7 +147,12 @@ class ContextImpl(pulsar.Context):
         batching_enabled=True,
         batching_max_publish_delay_ms=1,
         max_pending_messages=100000,
-        compression_type=pulsar_compression_type
+        compression_type=pulsar_compression_type,
+        properties=util.get_properties(util.getFullyQualifiedFunctionName(
+          self.instance_config.function_details.tenant,
+          self.instance_config.function_details.namespace,
+          self.instance_config.function_details.name),
+          self.instance_config.instance_id)
       )
 
     if serde_class_name not in self.publish_serializers:
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index d4c3da5..cfd9eae 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -122,6 +122,13 @@ class PythonInstance(object):
     subscription_name = str(self.instance_config.function_details.tenant) + "/" + \
                         str(self.instance_config.function_details.namespace) + "/" + \
                         str(self.instance_config.function_details.name)
+
+    properties = util.get_properties(util.getFullyQualifiedFunctionName(
+                        self.instance_config.function_details.tenant,
+                        self.instance_config.function_details.namespace,
+                        self.instance_config.function_details.name),
+                        self.instance_config.instance_id)
+
     for topic, serde in self.instance_config.function_details.source.topicsToSerDeClassName.items():
       if not serde:
         serde_kclass = util.import_class(os.path.dirname(self.user_code), DEFAULT_SERIALIZER)
@@ -133,7 +140,8 @@ class PythonInstance(object):
         str(topic), subscription_name,
         consumer_type=mode,
         message_listener=partial(self.message_listener, self.input_serdes[topic]),
-        unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+        unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
+        properties=properties
       )
 
     for topic, consumer_conf in self.instance_config.function_details.source.inputSpecs.items():
@@ -148,14 +156,16 @@ class PythonInstance(object):
           re.compile(str(topic)), subscription_name,
           consumer_type=mode,
           message_listener=partial(self.message_listener, self.input_serdes[topic]),
-          unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+          unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
+          properties=properties
         )
       else:
         self.consumers[topic] = self.pulsar_client.subscribe(
           str(topic), subscription_name,
           consumer_type=mode,
           message_listener=partial(self.message_listener, self.input_serdes[topic]),
-          unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None
+          unacked_messages_timeout_ms=int(self.timeout_ms) if self.timeout_ms else None,
+          properties=properties
         )
 
     function_kclass = util.import_class(os.path.dirname(self.user_code), self.instance_config.function_details.className)
@@ -271,7 +281,13 @@ class PythonInstance(object):
         # set send timeout to be infinity to prevent potential deadlock with consumer
         # that might happen when consumer is blocked due to unacked messages
         send_timeout_millis=0,
-        max_pending_messages=100000)
+        max_pending_messages=100000,
+        properties=util.get_properties(util.getFullyQualifiedFunctionName(
+                        self.instance_config.function_details.tenant,
+                        self.instance_config.function_details.namespace,
+                        self.instance_config.function_details.name),
+                        self.instance_config.instance_id)
+      )
 
   def message_listener(self, serde, consumer, message):
     # increment number of received records from source
diff --git a/pulsar-functions/instance/src/main/python/util.py b/pulsar-functions/instance/src/main/python/util.py
index 76f75bd..0978f39 100644
--- a/pulsar-functions/instance/src/main/python/util.py
+++ b/pulsar-functions/instance/src/main/python/util.py
@@ -68,6 +68,12 @@ def import_class_from_path(from_path, full_class_name):
 def getFullyQualifiedFunctionName(tenant, namespace, name):
   return "%s/%s/%s" % (tenant, namespace, name)
 
+def getFullyQualifiedInstanceId(tenant, namespace, name, instance_id):
+    return "%s/%s/%s:%s" % (tenant, namespace, name, instance_id)
+
+def get_properties(fullyQualifiedName, instanceId):
+    return {"application": "pulsar-function", "id": str(fullyQualifiedName), "instance_id": str(instanceId)}
+
 class FixedTimer():
 
     def __init__(self, t, hFunction):
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index fb3d2c2..53cfeb6 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.SinkContext;
 import org.mockito.ArgumentMatcher;
 import org.testng.Assert;
@@ -100,6 +101,7 @@ public class PulsarSinkTest {
         doReturn(producerBuilder).when(producerBuilder).topic(anyString());
         doReturn(producerBuilder).when(producerBuilder).producerName(anyString());
         doReturn(producerBuilder).when(producerBuilder).property(anyString(), anyString());
+        doReturn(producerBuilder).when(producerBuilder).properties(any());
         doReturn(producerBuilder).when(producerBuilder).sendTimeout(anyInt(), any());
         
         CompletableFuture completableFuture = new CompletableFuture<>();
@@ -158,7 +160,7 @@ public class PulsarSinkTest {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         try {
             Schema schema = pulsarSink.initializeSchema();
@@ -176,7 +178,7 @@ public class PulsarSinkTest {
         // set type to be inconsistent to that of SerDe
         pulsarConfig.setTypeClassName(Integer.class.getName());
         pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
         try {
             pulsarSink.initializeSchema();
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -198,7 +200,7 @@ public class PulsarSinkTest {
         PulsarSinkConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         try {
             pulsarSink.initializeSchema();
@@ -217,7 +219,7 @@ public class PulsarSinkTest {
         // set type to void
         pulsarConfig.setTypeClassName(String.class.getName());
         pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         try {
             pulsarSink.initializeSchema();
@@ -233,7 +235,7 @@ public class PulsarSinkTest {
         // set type to void
         pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
         pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
-        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, "test");
+        PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         try {
             pulsarSink.initializeSchema();
@@ -257,7 +259,7 @@ public class PulsarSinkTest {
         /** test At-least-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
-        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+        PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>());
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -314,7 +316,7 @@ public class PulsarSinkTest {
         /** test At-most-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>());
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
@@ -371,7 +373,7 @@ public class PulsarSinkTest {
         /** test Effectively-once **/
         pulsarClient = getPulsarClient();
         pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
-        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, "test");
+        pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>());
 
         pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 8e59e00..88c9637 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -46,6 +46,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.SourceContext;
 import org.testng.annotations.Test;
 
@@ -125,7 +126,7 @@ public class PulsarSourceTest {
         PulsarSourceConfig pulsarConfig = getPulsarConfigs();
         // set type to void
         pulsarConfig.setTypeClassName(Void.class.getName());
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -151,7 +152,7 @@ public class PulsarSourceTest {
         topicSerdeClassNameMap.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TestSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
         try {
             pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
             fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -176,7 +177,7 @@ public class PulsarSourceTest {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
@@ -192,7 +193,7 @@ public class PulsarSourceTest {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(TopicSchema.DEFAULT_SERDE).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
@@ -205,7 +206,7 @@ public class PulsarSourceTest {
         consumerConfigs.put("persistent://sample/standalone/ns1/test_result",
                 ConsumerConfig.builder().serdeClassName(ComplexSerDe.class.getName()).build());
         pulsarConfig.setTopicSchema(consumerConfigs);
-        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, "test");
+        PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig, new HashMap<>());
 
         pulsarSource.setupConsumerConfigs();
     }