You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 02:05:42 UTC

[pulsar] branch master updated: Enable pulsar function to send message to external pulsar cluster (#8434)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 66231e3  Enable pulsar function to send message to external pulsar cluster (#8434)
66231e3 is described below

commit 66231e313502bfe74fd28976560d4391c5cdefa5
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Fri Nov 20 21:05:25 2020 -0500

    Enable pulsar function to send message to external pulsar cluster (#8434)
    
    ### Motivation
    
    Enable pulsar function to send message to external pulsar cluster
    
    ### Modifications
    
    *Describe the modifications you've done.*
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  8 ++
 .../common/functions}/AuthenticationConfig.java    |  2 +-
 .../common/functions/ExternalPulsarConfig.java     | 23 +++--
 .../pulsar/common/functions/FunctionConfig.java    |  1 +
 .../org/apache/pulsar/functions/api/Context.java   | 14 ++-
 .../pulsar/functions/instance/ContextImpl.java     | 99 ++++++++++++----------
 .../pulsar/functions/instance/InstanceUtils.java   | 26 ++++++
 .../functions/instance/JavaInstanceRunnable.java   |  1 +
 .../pulsar/functions/instance/PulsarCluster.java   | 79 +++++++++++++++++
 .../api/examples/PublishExternalFunction.java      | 45 ++++++++++
 .../org/apache/pulsar/functions/LocalRunner.java   |  3 +-
 .../proto/src/main/proto/Function.proto            |  1 +
 .../auth/ClearTextFunctionTokenAuthProvider.java   |  2 +-
 .../functions/auth/FunctionAuthProvider.java       |  2 +-
 .../auth/KubernetesSecretsTokenAuthProvider.java   |  3 +-
 .../functions/runtime/JavaInstanceStarter.java     |  2 +-
 .../pulsar/functions/runtime/RuntimeFactory.java   |  2 +-
 .../pulsar/functions/runtime/RuntimeUtils.java     |  2 +-
 .../runtime/kubernetes/KubernetesRuntime.java      |  2 +-
 .../kubernetes/KubernetesRuntimeFactory.java       |  2 +-
 .../functions/runtime/process/ProcessRuntime.java  |  5 +-
 .../runtime/process/ProcessRuntimeFactory.java     |  3 +-
 .../functions/runtime/thread/ThreadRuntime.java    |  1 -
 .../runtime/thread/ThreadRuntimeFactory.java       | 32 +------
 .../ClearTextFunctionTokenAuthProviderTest.java    |  2 +-
 .../KubernetesSecretsTokenAuthProviderTest.java    |  2 +-
 .../kubernetes/KubernetesRuntimeFactoryTest.java   |  2 +-
 .../functions/utils/FunctionConfigUtils.java       | 47 +++-------
 .../functions/utils/ProducerConfigUtils.java       | 58 +++++++++++++
 .../pulsar/functions/utils/SourceConfigUtils.java  | 38 +--------
 .../functions/worker/FunctionRuntimeManager.java   |  2 +-
 31 files changed, 335 insertions(+), 176 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index f9670e0..8df1fe4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.ExternalPulsarConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
@@ -318,6 +319,8 @@ public class CmdFunctions extends CmdBase {
         protected String customRuntimeOptions;
         @Parameter(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to")
         protected String deadLetterTopic;
+        @Parameter(names = "--external-pulsars", description = "The map of external pulsar cluster name to its configuration (as a JSON string)")
+        protected String externalPulsars;
         protected FunctionConfig functionConfig;
         protected String userCodeFile;
 
@@ -396,6 +399,11 @@ public class CmdFunctions extends CmdBase {
             if (null != output) {
                 functionConfig.setOutput(output);
             }
+            if (null != externalPulsars) {
+                Type type = new TypeToken<Map<String, ExternalPulsarConfig>>() {
+                }.getType();
+                functionConfig.setExternalPulsars(new Gson().fromJson(externalPulsars, type));
+            }
             if (null != producerConfig) {
                 Type type = new TypeToken<ProducerConfig>() {}.getType();
                 functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java
similarity index 96%
copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java
index a3461cc..7543635 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.common.functions;
 
 import lombok.Builder;
 import lombok.Data;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java
similarity index 66%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java
index a3461cc..fc16d48 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java
@@ -16,21 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
 
+package org.apache.pulsar.common.functions;
+
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
 
 /**
- * Configuration to aggregate various authentication params.
+ * Configuration of extra pulsar clusters to sent output message.
  */
 @Data
 @Builder
-public class AuthenticationConfig {
-    private String clientAuthenticationPlugin;
-    private String clientAuthenticationParameters;
-    private String tlsTrustCertsFilePath;
-    private boolean useTls;
-    private boolean tlsAllowInsecureConnection;
-    private boolean tlsHostnameVerificationEnable;
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ExternalPulsarConfig {
+    private String name;
+    private String serviceURL;
+    private AuthenticationConfig authConfig;
+    private ProducerConfig producerConfig;
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index c9f4645..951083d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -99,6 +99,7 @@ public class FunctionConfig {
     private String batchBuilder;
     private Boolean forwardSourceMessageProperty;
     private Map<String, Object> userConfig;
+    private Map<String, ExternalPulsarConfig> externalPulsars;
     // This is a map of secretName(aka how the secret is going to be
     // accessed in the function via context) to an object that
     // encapsulates how the secret is fetched by the underlying
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index db986a1..f697f63 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -304,6 +304,18 @@ public interface Context {
     <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
 
     /**
+     * New output message using schema for serializing to the topic in the cluster
+     *
+     * @parem clusterName the name of the cluster for topic
+     * @param topicName The name of the topic for output message
+     * @param schema provide a way to convert between serialized data and domain objects
+     * @param <O>
+     * @return the message builder instance
+     * @throws PulsarClientException
+     */
+    <O> TypedMessageBuilder<O> newOutputMessage(String clusterName, String topicName, Schema<O> schema) throws PulsarClientException;
+
+    /**
      * Create a ConsumerBuilder with the schema.
      *
      * @param schema provide a way to convert between serialized data and domain objects
@@ -312,4 +324,4 @@ public interface Context {
      * @throws PulsarClientException
      */
     <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 1a7da8f..85051f5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.functions.ExternalPulsarConfig;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Record;
@@ -59,8 +61,8 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
-import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.ProducerConfigUtils;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
@@ -78,12 +80,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     // Per Message related
     private Record<?> record;
 
-    private PulsarClient client;
-    private Map<String, Producer<?>> publishProducers;
-    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
-    private ProducerBuilderImpl<?> producerBuilder;
-
-    private final TopicSchema topicSchema;
+    private String defaultPulsarCluster;
+    private Map<String, PulsarCluster> externalPulsarClusters;
 
     private final SecretsProvider secretsProvider;
     private final Map<String, Object> secretsMap;
@@ -117,27 +115,25 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
                        StateManager stateManager) {
         this.config = config;
         this.logger = logger;
-        this.client = client;
-        this.topicSchema = new TopicSchema(client);
         this.statsManager = statsManager;
 
-        this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
-                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
-        boolean useThreadLocalProducers = false;
-        if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
-            if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
-                this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
-            }
-            if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
-                this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+        this.externalPulsarClusters = new HashMap<>();
+        if (!config.getFunctionDetails().getExternalPulsarsMap().isEmpty()) {
+            Map<String, ExternalPulsarConfig> externalPulsarConfig = new Gson().fromJson(config.getFunctionDetails().getExternalPulsarsMap(),
+                    new TypeToken<Map<String, ExternalPulsarConfig>>() {
+                    }.getType());
+            for (Map.Entry<String, ExternalPulsarConfig> entry : externalPulsarConfig.entrySet()) {
+                try {
+                    this.externalPulsarClusters.put(entry.getKey(),
+                            new PulsarCluster(InstanceUtils.createPulsarClient(entry.getValue().getServiceURL(), entry.getValue().getAuthConfig()),
+                                    ProducerConfigUtils.convert(entry.getValue().getProducerConfig())));
+                } catch (PulsarClientException ex) {
+                    throw new RuntimeException("failed to create pulsar client for external cluster: " + entry.getKey(), ex);
+                }
             }
-            useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
-        }
-        if (useThreadLocalProducers) {
-            tlPublishProducers = new ThreadLocal<>();
-        } else {
-            publishProducers = new HashMap<>();
         }
+        this.defaultPulsarCluster = "default-" + UUID.randomUUID();
+        this.externalPulsarClusters.put(defaultPulsarCluster, new PulsarCluster(client, config.getFunctionDetails().getSink().getProducerSpec()));
 
         if (config.getFunctionDetails().getUserConfig().isEmpty()) {
             userConfigs = new HashMap<>();
@@ -392,25 +388,34 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
-        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
+        return publish(topicName, object, (Schema<O>) externalPulsarClusters.get(defaultPulsarCluster).getTopicSchema().getSchema(topicName, object, schemaOrSerdeClassName, false));
     }
 
     @Override
     public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+        return newOutputMessage(defaultPulsarCluster, topicName, schema);
+    }
+
+    @Override
+    public <O> TypedMessageBuilder<O> newOutputMessage(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
         MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
-        TypedMessageBuilder<O> typedMessageBuilder = getProducer(topicName, schema).newMessage();
+        TypedMessageBuilder<O> typedMessageBuilder = getProducer(pulsarName, topicName, schema).newMessage();
         messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
         return messageBuilder;
     }
 
     @Override
     public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
-        return this.client.newConsumer(schema);
+        return this.externalPulsarClusters.get(defaultPulsarCluster).getClient().newConsumer(schema);
     }
 
     public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
+        return publish(defaultPulsarCluster, topicName, object, schema);
+    }
+
+    public <O> CompletableFuture<Void> publish(String pulsarName, String topicName, O object, Schema<O> schema) {
         try {
-            return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
+           return newOutputMessage(pulsarName, topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
         } catch (PulsarClientException e) {
             logger.error("Failed to create Producer while doing user publish", e);
             return FutureUtil.failedFuture(e);
@@ -432,22 +437,23 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
         }
     }
 
-    private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
+    private <O> Producer<O> getProducer(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
         Producer<O> producer;
-        if (tlPublishProducers != null) {
-            Map<String, Producer<?>> producerMap = tlPublishProducers.get();
+        PulsarCluster pulsar = externalPulsarClusters.get(pulsarName);
+        if (pulsar.getTlPublishProducers() != null) {
+            Map<String, Producer<?>> producerMap = pulsar.getTlPublishProducers().get();
             if (producerMap == null) {
                 producerMap = new HashMap<>();
-                tlPublishProducers.set(producerMap);
+                pulsar.getTlPublishProducers().set(producerMap);
             }
             producer = (Producer<O>) producerMap.get(topicName);
         } else {
-            producer = (Producer<O>) publishProducers.get(topicName);
+            producer = (Producer<O>) pulsar.getPublishProducers().get(topicName);
         }
 
         if (producer == null) {
 
-            Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
+            Producer<O> newProducer = ((ProducerBuilderImpl<O>) pulsar.getProducerBuilder().clone())
                     .schema(schema)
                     .blockIfQueueFull(true)
                     .enableBatching(true)
@@ -468,10 +474,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
                             this.config.getInstanceId()))
                     .create();
 
-            if (tlPublishProducers != null) {
-                tlPublishProducers.get().put(topicName, newProducer);
+            if (pulsar.getTlPublishProducers() != null) {
+                pulsar.getTlPublishProducers().get().put(topicName, newProducer);
             } else {
-                Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
+                Producer<O> existingProducer = (Producer<O>) pulsar.getPublishProducers().putIfAbsent(topicName, newProducer);
 
                 if (existingProducer != null) {
                     // The value in the map was not updated after the concurrent put
@@ -621,15 +627,18 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
     public void close() {
         List<CompletableFuture> futures = new LinkedList<>();
 
-        if (publishProducers != null) {
-            for (Producer<?> producer : publishProducers.values()) {
-                futures.add(producer.closeAsync());
+        for (Map.Entry<String, PulsarCluster> pulsarEntry : externalPulsarClusters.entrySet()) {
+            PulsarCluster pulsar = pulsarEntry.getValue();
+            if (pulsar.getPublishProducers() != null) {
+                for (Producer<?> producer : pulsar.getPublishProducers().values()) {
+                    futures.add(producer.closeAsync());
+                }
             }
-        }
 
-        if (tlPublishProducers != null) {
-            for (Producer<?> producer : tlPublishProducers.get().values()) {
-                futures.add(producer.closeAsync());
+            if (pulsar.getTlPublishProducers() != null) {
+                for (Producer<?> producer : pulsar.getTlPublishProducers().get().values()) {
+                    futures.add(producer.closeAsync());
+                }
             }
         }
 
@@ -639,4 +648,4 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
             logger.warn("Failed to close producers", e);
         }
     }
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 3d4aa10..024599c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -20,12 +20,17 @@ package org.apache.pulsar.functions.instance;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
 import lombok.experimental.UtilityClass;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.sink.PulsarSink;
@@ -145,4 +150,25 @@ public class InstanceUtils {
         }
         return properties;
     }
+
+    public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) throws PulsarClientException {
+        ClientBuilder clientBuilder = null;
+        if (isNotBlank(pulsarServiceUrl)) {
+            clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+            if (authConfig != null) {
+                if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                        && isNotBlank(authConfig.getClientAuthenticationParameters())) {
+                    clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(),
+                            authConfig.getClientAuthenticationParameters());
+                }
+                clientBuilder.enableTls(authConfig.isUseTls());
+                clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection());
+                clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable());
+                clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
+            }
+            clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
+            return clientBuilder.build();
+        }
+        return null;
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 316d964..fa68656 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -86,6 +86,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
 
     // input topic consumer & output topic producer
     private final PulsarClientImpl client;
+    //private final Map<String, PulsarClient> pulsarClientMap;
 
     private LogAppender logAppender;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java
new file mode 100644
index 0000000..58d879f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.functions.proto.Function.ProducerSpec;
+import org.apache.pulsar.functions.source.TopicSchema;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public class PulsarCluster {
+    @Getter
+    private PulsarClient client;
+
+    @Getter
+    private final TopicSchema topicSchema;
+
+    @Getter
+    private ProducerBuilderImpl<?> producerBuilder;
+
+    @Getter
+    private Map<String, Producer<?>> publishProducers;
+
+    @Getter
+    private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
+
+    public PulsarCluster(PulsarClient client, ProducerSpec producerSpec) {
+        this.client = client;
+        this.topicSchema = new TopicSchema(client);
+        this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
+                .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+        boolean useThreadLocalProducers = false;
+        if (producerSpec != null) {
+            if (producerSpec.getMaxPendingMessages() != 0) {
+                this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
+            }
+            if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+                this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
+            }
+            if (producerSpec.getBatchBuilder() != null) {
+                if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+                    this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+                } else {
+                    this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+                }
+            }
+            useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
+        }
+        if (useThreadLocalProducers) {
+            tlPublishProducers = new ThreadLocal<>();
+        } else {
+            publishProducers = new HashMap<>();
+        }
+    }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java
new file mode 100644
index 0000000..962322c
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Example function that uses the built in publish function in the context
+ * to publish to a desired cluster & topic based on config.
+ */
+public class PublishExternalFunction implements Function<String, Void> {
+    @Override
+    public Void process(String input, Context context) {
+        // Ensure the desired cluster `external` config is provided when creating the function
+        String externalCluster = "external";
+        String publishTopic = (String) context.getUserConfigValueOrDefault("external-topic", "default-external-topic");
+        String output = String.format("%s!", input);
+        try {
+            context.newOutputMessage(externalCluster, publishTopic, Schema.STRING).value(output).sendAsync();
+        } catch (PulsarClientException e) {
+            context.getLogger().error(e.toString());
+        }
+        return null;
+    }
+}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index cd24bd0..5b75c95 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -32,14 +32,13 @@ import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
-import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.secretsproviderconfigurator.NameAndConfigBasedSecretsProviderConfigurator;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 4b840a3..77396eb 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -88,6 +88,7 @@ message FunctionDetails {
     bool retainOrdering = 21;
     bool retainKeyOrdering = 22;
     SubscriptionPosition subscriptionPosition = 23;
+    string externalPulsarsMap = 24;
 }
 
 message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
index 5a0bd79..929affd 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.functions.auth;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 
 import java.util.Optional;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
index 00a8ee9..08ea6ab 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
@@ -19,7 +19,7 @@
 package org.apache.pulsar.functions.auth;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.common.util.Reflections;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index ae5fec1..457a26a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.auth;
 import com.google.common.annotations.VisibleForTesting;
 import io.kubernetes.client.openapi.ApiException;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1DeleteOptions;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1Secret;
@@ -33,7 +32,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index a1b88a4..6b34221 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -42,7 +42,7 @@ import io.prometheus.jmx.JmxCollector;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 51e342e..56fb701 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -20,7 +20,7 @@
 package org.apache.pulsar.functions.runtime;
 
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 89827b9..242b46e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -35,7 +35,7 @@ import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a0bf855..8777b83 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -56,7 +56,7 @@ import lombok.extern.slf4j.Slf4j;
 import okhttp3.Response;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index e35a44d..f001af4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 876b91a..0b75f3d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -30,7 +30,7 @@ import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -50,9 +50,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 
 /**
  * A function container implemented using java thread.
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 6892f8f..0606afc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -27,12 +27,11 @@ import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 import org.apache.pulsar.functions.worker.WorkerConfig;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 5cc74c5..d053b20 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -106,7 +106,6 @@ public class ThreadRuntime implements Runtime {
                 secretsProvider,
                 collectorRegistry,
                 narExtractionDirectory);
-
         log.info("ThreadContainer starting function with instance config {}", instanceConfig);
         this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
                 String.format("%s-%s",
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 5a60f64..7601dfa 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -24,13 +24,13 @@ import io.prometheus.client.CollectorRegistry;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceCache;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -43,8 +43,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.util.Optional;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 /**
  * Thread based function container factory implementation.
  */
@@ -72,7 +70,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                                 AuthenticationConfig authConfig, SecretsProvider secretsProvider,
                                 CollectorRegistry collectorRegistry, String narExtractionDirectory,
                                 ClassLoader rootClassLoader) throws Exception {
-        initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
+        initialize(threadGroupName, InstanceUtils.createPulsarClient(pulsarServiceUrl, authConfig),
                 storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
     }
 
@@ -85,28 +83,6 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                 null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
     }
 
-    private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
-            throws PulsarClientException {
-        ClientBuilder clientBuilder = null;
-        if (isNotBlank(pulsarServiceUrl)) {
-            clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
-            if (authConfig != null) {
-                if (isNotBlank(authConfig.getClientAuthenticationPlugin())
-                        && isNotBlank(authConfig.getClientAuthenticationParameters())) {
-                    clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(),
-                            authConfig.getClientAuthenticationParameters());
-                }
-                clientBuilder.enableTls(authConfig.isUseTls());
-                clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection());
-                clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable());
-                clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
-            }
-            clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
-            return clientBuilder.build();
-        }
-        return null;
-    }
-
     private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
                             SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
                             CollectorRegistry collectorRegistry,  String narExtractionDirectory, ClassLoader rootClassLoader) {
@@ -134,7 +110,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                 workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
 
         initialize(factoryConfig.getThreadGroupName(),
-                createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
+                InstanceUtils.createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
                 workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null);
     }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
index 924653f..3f3afdd 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.functions.auth;
 
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.Assert;
 import org.testng.annotations.Test;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
index 38dd914..d7b2b45 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -38,7 +38,7 @@ import java.util.Optional;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.Assert;
 import org.testng.annotations.Test;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 8e207db..62d71c0 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -32,7 +32,7 @@ import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index b42292f..6b9b762 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -212,24 +212,7 @@ public class FunctionConfigUtils {
             sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
         }
         if (functionConfig.getProducerConfig() != null) {
-            ProducerConfig producerConf = functionConfig.getProducerConfig();
-            Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
-            if (producerConf.getMaxPendingMessages() != null) {
-                pbldr.setMaxPendingMessages(producerConf.getMaxPendingMessages());
-            }
-            if (producerConf.getMaxPendingMessagesAcrossPartitions() != null) {
-                pbldr.setMaxPendingMessagesAcrossPartitions(producerConf.getMaxPendingMessagesAcrossPartitions());
-            }
-            if (producerConf.getUseThreadLocalProducers() != null) {
-                pbldr.setUseThreadLocalProducers(producerConf.getUseThreadLocalProducers());
-            }
-            if (producerConf.getCryptoConfig() != null) {
-                pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig()));
-            }
-            if (producerConf.getBatchBuilder() != null) {
-                pbldr.setBatchBuilder(producerConf.getBatchBuilder());
-            }
-            sinkSpecBuilder.setProducerSpec(pbldr.build());
+            sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(functionConfig.getProducerConfig()));
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
@@ -294,6 +277,10 @@ public class FunctionConfigUtils {
             functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
         }
 
+        if (functionConfig.getExternalPulsars() != null && !functionConfig.getExternalPulsars().isEmpty()) {
+            functionDetailsBuilder.setExternalPulsarsMap(new Gson().toJson(functionConfig.getExternalPulsars()));
+        }
+
         if (functionConfig.getAutoAck() != null) {
             functionDetailsBuilder.setAutoAck(functionConfig.getAutoAck());
         } else {
@@ -380,22 +367,7 @@ public class FunctionConfigUtils {
             functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
         }
         if (functionDetails.getSink().getProducerSpec() != null) {
-            Function.ProducerSpec spec = functionDetails.getSink().getProducerSpec();
-            ProducerConfig producerConfig = new ProducerConfig();
-            if (spec.getMaxPendingMessages() != 0) {
-                producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
-            }
-            if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
-                producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
-            }
-            if (spec.hasCryptoSpec()) {
-                producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
-            }
-            if (spec.getBatchBuilder() != null) {
-                producerConfig.setBatchBuilder(spec.getBatchBuilder());
-            }
-            producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
-            functionConfig.setProducerConfig(producerConfig);
+            functionConfig.setProducerConfig(ProducerConfigUtils.convertFromSpec(functionDetails.getSink().getProducerSpec()));
         }
         if (!isEmpty(functionDetails.getLogTopic())) {
             functionConfig.setLogTopic(functionDetails.getLogTopic());
@@ -437,6 +409,13 @@ public class FunctionConfigUtils {
             functionConfig.setSecrets(secretsMap);
         }
 
+        if (isNotEmpty(functionDetails.getExternalPulsarsMap())) {
+            Type type = new TypeToken<Map<String, ExternalPulsarConfig>>() {
+            }.getType();
+            Map<String, ExternalPulsarConfig> externalPulsarsMap = new Gson().fromJson(functionDetails.getExternalPulsarsMap(), type);
+            functionConfig.setExternalPulsars(externalPulsarsMap);
+        }
+
         if (functionDetails.hasResources()) {
             Resources resources = new Resources();
             resources.setCpu(functionDetails.getResources().getCpu());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java
new file mode 100644
index 0000000..8e73ff8
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.functions.proto.Function;
+
+public class ProducerConfigUtils {
+    public static Function.ProducerSpec convert(ProducerConfig conf) {
+        Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
+        if (conf.getMaxPendingMessages() != null) {
+            pbldr.setMaxPendingMessages(conf.getMaxPendingMessages());
+        }
+        if (conf.getMaxPendingMessagesAcrossPartitions() != null) {
+            pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions());
+        }
+        if (conf.getUseThreadLocalProducers() != null) {
+            pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers());
+        }
+        if (conf.getBatchBuilder() != null) {
+            pbldr.setBatchBuilder(conf.getBatchBuilder());
+        }
+
+        return pbldr.build();
+    }
+
+    public static ProducerConfig convertFromSpec(Function.ProducerSpec spec) {
+        ProducerConfig producerConfig = new ProducerConfig();
+        if (spec.getMaxPendingMessages() != 0) {
+            producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
+        }
+        if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
+            producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
+        }
+        if (spec.getBatchBuilder() != null) {
+            producerConfig.setBatchBuilder(spec.getBatchBuilder());
+        }
+        producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
+        return producerConfig;
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 04f524a..08b561b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -28,7 +28,6 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.functions.ProducerConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.BatchSourceConfig;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -54,7 +53,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
 
@@ -149,24 +147,7 @@ public class SourceConfigUtils {
         }
 
         if (sourceConfig.getProducerConfig() != null) {
-            ProducerConfig conf = sourceConfig.getProducerConfig();
-            Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
-            if (conf.getMaxPendingMessages() != null) {
-                pbldr.setMaxPendingMessages(conf.getMaxPendingMessages());
-            }
-            if (conf.getMaxPendingMessagesAcrossPartitions() != null) {
-                pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions());
-            }
-            if (conf.getUseThreadLocalProducers() != null) {
-                pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers());
-            }
-            if (conf.getCryptoConfig() != null) {
-                pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig()));
-            }
-            if (conf.getBatchBuilder() != null) {
-                pbldr.setBatchBuilder(conf.getBatchBuilder());
-            }
-            sinkSpecBuilder.setProducerSpec(pbldr.build());
+            sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
         }
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -239,22 +220,7 @@ public class SourceConfigUtils {
             sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
         }
         if (sinkSpec.getProducerSpec() != null) {
-            Function.ProducerSpec spec = sinkSpec.getProducerSpec();
-            ProducerConfig producerConfig = new ProducerConfig();
-            if (spec.getMaxPendingMessages() != 0) {
-                producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
-            }
-            if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
-                producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
-            }
-            if (spec.hasCryptoSpec()) {
-                producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
-            }
-            if (spec.getBatchBuilder() != null) {
-                producerConfig.setBatchBuilder(spec.getBatchBuilder());
-            }
-            producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
-            sourceConfig.setProducerConfig(producerConfig);
+            sourceConfig.setProducerConfig(ProducerConfigUtils.convertFromSpec(sinkSpec.getProducerSpec()));
         }
         if (functionDetails.hasResources()) {
             Resources resources = new Resources();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e359e83..bb07115 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.RuntimeCustomizer;