You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/21 02:05:42 UTC
[pulsar] branch master updated: Enable pulsar function to send
message to external pulsar cluster (#8434)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 66231e3 Enable pulsar function to send message to external pulsar cluster (#8434)
66231e3 is described below
commit 66231e313502bfe74fd28976560d4391c5cdefa5
Author: Neng Lu <nl...@streamnative.io>
AuthorDate: Fri Nov 20 21:05:25 2020 -0500
Enable pulsar function to send message to external pulsar cluster (#8434)
### Motivation
Enable pulsar function to send message to external pulsar cluster
### Modifications
*Describe the modifications you've done.*
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 8 ++
.../common/functions}/AuthenticationConfig.java | 2 +-
.../common/functions/ExternalPulsarConfig.java | 23 +++--
.../pulsar/common/functions/FunctionConfig.java | 1 +
.../org/apache/pulsar/functions/api/Context.java | 14 ++-
.../pulsar/functions/instance/ContextImpl.java | 99 ++++++++++++----------
.../pulsar/functions/instance/InstanceUtils.java | 26 ++++++
.../functions/instance/JavaInstanceRunnable.java | 1 +
.../pulsar/functions/instance/PulsarCluster.java | 79 +++++++++++++++++
.../api/examples/PublishExternalFunction.java | 45 ++++++++++
.../org/apache/pulsar/functions/LocalRunner.java | 3 +-
.../proto/src/main/proto/Function.proto | 1 +
.../auth/ClearTextFunctionTokenAuthProvider.java | 2 +-
.../functions/auth/FunctionAuthProvider.java | 2 +-
.../auth/KubernetesSecretsTokenAuthProvider.java | 3 +-
.../functions/runtime/JavaInstanceStarter.java | 2 +-
.../pulsar/functions/runtime/RuntimeFactory.java | 2 +-
.../pulsar/functions/runtime/RuntimeUtils.java | 2 +-
.../runtime/kubernetes/KubernetesRuntime.java | 2 +-
.../kubernetes/KubernetesRuntimeFactory.java | 2 +-
.../functions/runtime/process/ProcessRuntime.java | 5 +-
.../runtime/process/ProcessRuntimeFactory.java | 3 +-
.../functions/runtime/thread/ThreadRuntime.java | 1 -
.../runtime/thread/ThreadRuntimeFactory.java | 32 +------
.../ClearTextFunctionTokenAuthProviderTest.java | 2 +-
.../KubernetesSecretsTokenAuthProviderTest.java | 2 +-
.../kubernetes/KubernetesRuntimeFactoryTest.java | 2 +-
.../functions/utils/FunctionConfigUtils.java | 47 +++-------
.../functions/utils/ProducerConfigUtils.java | 58 +++++++++++++
.../pulsar/functions/utils/SourceConfigUtils.java | 38 +--------
.../functions/worker/FunctionRuntimeManager.java | 2 +-
31 files changed, 335 insertions(+), 176 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index f9670e0..8df1fe4 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -51,6 +51,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
@@ -318,6 +319,8 @@ public class CmdFunctions extends CmdBase {
protected String customRuntimeOptions;
@Parameter(names = "--dead-letter-topic", description = "The topic where messages that are not processed successfully are sent to")
protected String deadLetterTopic;
+ @Parameter(names = "--external-pulsars", description = "The map of external pulsar cluster name to its configuration (as a JSON string)")
+ protected String externalPulsars;
protected FunctionConfig functionConfig;
protected String userCodeFile;
@@ -396,6 +399,11 @@ public class CmdFunctions extends CmdBase {
if (null != output) {
functionConfig.setOutput(output);
}
+ if (null != externalPulsars) {
+ Type type = new TypeToken<Map<String, ExternalPulsarConfig>>() {
+ }.getType();
+ functionConfig.setExternalPulsars(new Gson().fromJson(externalPulsars, type));
+ }
if (null != producerConfig) {
Type type = new TypeToken<ProducerConfig>() {}.getType();
functionConfig.setProducerConfig(new Gson().fromJson(producerConfig, type));
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java
similarity index 96%
copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java
index a3461cc..7543635 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/AuthenticationConfig.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.common.functions;
import lombok.Builder;
import lombok.Data;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java
similarity index 66%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java
index a3461cc..fc16d48 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/AuthenticationConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/ExternalPulsarConfig.java
@@ -16,21 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.common.functions;
+
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
/**
- * Configuration to aggregate various authentication params.
+ * Configuration of extra pulsar clusters to sent output message.
*/
@Data
@Builder
-public class AuthenticationConfig {
- private String clientAuthenticationPlugin;
- private String clientAuthenticationParameters;
- private String tlsTrustCertsFilePath;
- private boolean useTls;
- private boolean tlsAllowInsecureConnection;
- private boolean tlsHostnameVerificationEnable;
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode
+public class ExternalPulsarConfig {
+ private String name;
+ private String serviceURL;
+ private AuthenticationConfig authConfig;
+ private ProducerConfig producerConfig;
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index c9f4645..951083d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -99,6 +99,7 @@ public class FunctionConfig {
private String batchBuilder;
private Boolean forwardSourceMessageProperty;
private Map<String, Object> userConfig;
+ private Map<String, ExternalPulsarConfig> externalPulsars;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
// encapsulates how the secret is fetched by the underlying
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index db986a1..f697f63 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -304,6 +304,18 @@ public interface Context {
<O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException;
/**
+ * New output message using schema for serializing to the topic in the cluster
+ *
+ * @parem clusterName the name of the cluster for topic
+ * @param topicName The name of the topic for output message
+ * @param schema provide a way to convert between serialized data and domain objects
+ * @param <O>
+ * @return the message builder instance
+ * @throws PulsarClientException
+ */
+ <O> TypedMessageBuilder<O> newOutputMessage(String clusterName, String topicName, Schema<O> schema) throws PulsarClientException;
+
+ /**
* Create a ConsumerBuilder with the schema.
*
* @param schema provide a way to convert between serialized data and domain objects
@@ -312,4 +324,4 @@ public interface Context {
* @throws PulsarClientException
*/
<O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException;
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 1a7da8f..85051f5 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.common.functions.ExternalPulsarConfig;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Record;
@@ -59,8 +61,8 @@ import org.apache.pulsar.functions.instance.stats.SourceStatsManager;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
-import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.ProducerConfigUtils;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
@@ -78,12 +80,8 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
// Per Message related
private Record<?> record;
- private PulsarClient client;
- private Map<String, Producer<?>> publishProducers;
- private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
- private ProducerBuilderImpl<?> producerBuilder;
-
- private final TopicSchema topicSchema;
+ private String defaultPulsarCluster;
+ private Map<String, PulsarCluster> externalPulsarClusters;
private final SecretsProvider secretsProvider;
private final Map<String, Object> secretsMap;
@@ -117,27 +115,25 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
StateManager stateManager) {
this.config = config;
this.logger = logger;
- this.client = client;
- this.topicSchema = new TopicSchema(client);
this.statsManager = statsManager;
- this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
- .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
- boolean useThreadLocalProducers = false;
- if (config.getFunctionDetails().getSink().getProducerSpec() != null) {
- if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages() != 0) {
- this.producerBuilder.maxPendingMessages(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessages());
- }
- if (config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions() != 0) {
- this.producerBuilder.maxPendingMessagesAcrossPartitions(config.getFunctionDetails().getSink().getProducerSpec().getMaxPendingMessagesAcrossPartitions());
+ this.externalPulsarClusters = new HashMap<>();
+ if (!config.getFunctionDetails().getExternalPulsarsMap().isEmpty()) {
+ Map<String, ExternalPulsarConfig> externalPulsarConfig = new Gson().fromJson(config.getFunctionDetails().getExternalPulsarsMap(),
+ new TypeToken<Map<String, ExternalPulsarConfig>>() {
+ }.getType());
+ for (Map.Entry<String, ExternalPulsarConfig> entry : externalPulsarConfig.entrySet()) {
+ try {
+ this.externalPulsarClusters.put(entry.getKey(),
+ new PulsarCluster(InstanceUtils.createPulsarClient(entry.getValue().getServiceURL(), entry.getValue().getAuthConfig()),
+ ProducerConfigUtils.convert(entry.getValue().getProducerConfig())));
+ } catch (PulsarClientException ex) {
+ throw new RuntimeException("failed to create pulsar client for external cluster: " + entry.getKey(), ex);
+ }
}
- useThreadLocalProducers = config.getFunctionDetails().getSink().getProducerSpec().getUseThreadLocalProducers();
- }
- if (useThreadLocalProducers) {
- tlPublishProducers = new ThreadLocal<>();
- } else {
- publishProducers = new HashMap<>();
}
+ this.defaultPulsarCluster = "default-" + UUID.randomUUID();
+ this.externalPulsarClusters.put(defaultPulsarCluster, new PulsarCluster(client, config.getFunctionDetails().getSink().getProducerSpec()));
if (config.getFunctionDetails().getUserConfig().isEmpty()) {
userConfigs = new HashMap<>();
@@ -392,25 +388,34 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
- return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
+ return publish(topicName, object, (Schema<O>) externalPulsarClusters.get(defaultPulsarCluster).getTopicSchema().getSchema(topicName, object, schemaOrSerdeClassName, false));
}
@Override
public <O> TypedMessageBuilder<O> newOutputMessage(String topicName, Schema<O> schema) throws PulsarClientException {
+ return newOutputMessage(defaultPulsarCluster, topicName, schema);
+ }
+
+ @Override
+ public <O> TypedMessageBuilder<O> newOutputMessage(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
MessageBuilderImpl<O> messageBuilder = new MessageBuilderImpl<>();
- TypedMessageBuilder<O> typedMessageBuilder = getProducer(topicName, schema).newMessage();
+ TypedMessageBuilder<O> typedMessageBuilder = getProducer(pulsarName, topicName, schema).newMessage();
messageBuilder.setUnderlyingBuilder(typedMessageBuilder);
return messageBuilder;
}
@Override
public <O> ConsumerBuilder<O> newConsumerBuilder(Schema<O> schema) throws PulsarClientException {
- return this.client.newConsumer(schema);
+ return this.externalPulsarClusters.get(defaultPulsarCluster).getClient().newConsumer(schema);
}
public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
+ return publish(defaultPulsarCluster, topicName, object, schema);
+ }
+
+ public <O> CompletableFuture<Void> publish(String pulsarName, String topicName, O object, Schema<O> schema) {
try {
- return newOutputMessage(topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
+ return newOutputMessage(pulsarName, topicName, schema).value(object).sendAsync().thenApply(msgId -> null);
} catch (PulsarClientException e) {
logger.error("Failed to create Producer while doing user publish", e);
return FutureUtil.failedFuture(e);
@@ -432,22 +437,23 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
}
}
- private <O> Producer<O> getProducer(String topicName, Schema<O> schema) throws PulsarClientException {
+ private <O> Producer<O> getProducer(String pulsarName, String topicName, Schema<O> schema) throws PulsarClientException {
Producer<O> producer;
- if (tlPublishProducers != null) {
- Map<String, Producer<?>> producerMap = tlPublishProducers.get();
+ PulsarCluster pulsar = externalPulsarClusters.get(pulsarName);
+ if (pulsar.getTlPublishProducers() != null) {
+ Map<String, Producer<?>> producerMap = pulsar.getTlPublishProducers().get();
if (producerMap == null) {
producerMap = new HashMap<>();
- tlPublishProducers.set(producerMap);
+ pulsar.getTlPublishProducers().set(producerMap);
}
producer = (Producer<O>) producerMap.get(topicName);
} else {
- producer = (Producer<O>) publishProducers.get(topicName);
+ producer = (Producer<O>) pulsar.getPublishProducers().get(topicName);
}
if (producer == null) {
- Producer<O> newProducer = ((ProducerBuilderImpl<O>) producerBuilder.clone())
+ Producer<O> newProducer = ((ProducerBuilderImpl<O>) pulsar.getProducerBuilder().clone())
.schema(schema)
.blockIfQueueFull(true)
.enableBatching(true)
@@ -468,10 +474,10 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
this.config.getInstanceId()))
.create();
- if (tlPublishProducers != null) {
- tlPublishProducers.get().put(topicName, newProducer);
+ if (pulsar.getTlPublishProducers() != null) {
+ pulsar.getTlPublishProducers().get().put(topicName, newProducer);
} else {
- Producer<O> existingProducer = (Producer<O>) publishProducers.putIfAbsent(topicName, newProducer);
+ Producer<O> existingProducer = (Producer<O>) pulsar.getPublishProducers().putIfAbsent(topicName, newProducer);
if (existingProducer != null) {
// The value in the map was not updated after the concurrent put
@@ -621,15 +627,18 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
public void close() {
List<CompletableFuture> futures = new LinkedList<>();
- if (publishProducers != null) {
- for (Producer<?> producer : publishProducers.values()) {
- futures.add(producer.closeAsync());
+ for (Map.Entry<String, PulsarCluster> pulsarEntry : externalPulsarClusters.entrySet()) {
+ PulsarCluster pulsar = pulsarEntry.getValue();
+ if (pulsar.getPublishProducers() != null) {
+ for (Producer<?> producer : pulsar.getPublishProducers().values()) {
+ futures.add(producer.closeAsync());
+ }
}
- }
- if (tlPublishProducers != null) {
- for (Producer<?> producer : tlPublishProducers.get().values()) {
- futures.add(producer.closeAsync());
+ if (pulsar.getTlPublishProducers() != null) {
+ for (Producer<?> producer : pulsar.getTlPublishProducers().get().values()) {
+ futures.add(producer.closeAsync());
+ }
}
}
@@ -639,4 +648,4 @@ class ContextImpl implements Context, SinkContext, SourceContext, AutoCloseable
logger.warn("Failed to close producers", e);
}
}
-}
\ No newline at end of file
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index 3d4aa10..024599c 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -20,12 +20,17 @@ package org.apache.pulsar.functions.instance;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
@@ -145,4 +150,25 @@ public class InstanceUtils {
}
return properties;
}
+
+ public static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig) throws PulsarClientException {
+ ClientBuilder clientBuilder = null;
+ if (isNotBlank(pulsarServiceUrl)) {
+ clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
+ if (authConfig != null) {
+ if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
+ clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(),
+ authConfig.getClientAuthenticationParameters());
+ }
+ clientBuilder.enableTls(authConfig.isUseTls());
+ clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection());
+ clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable());
+ clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
+ }
+ clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
+ return clientBuilder.build();
+ }
+ return null;
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 316d964..fa68656 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -86,6 +86,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// input topic consumer & output topic producer
private final PulsarClientImpl client;
+ //private final Map<String, PulsarClient> pulsarClientMap;
private LogAppender logAppender;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java
new file mode 100644
index 0000000..58d879f
--- /dev/null
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarCluster.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import org.apache.pulsar.client.api.BatcherBuilder;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.functions.proto.Function.ProducerSpec;
+import org.apache.pulsar.functions.source.TopicSchema;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+public class PulsarCluster {
+ @Getter
+ private PulsarClient client;
+
+ @Getter
+ private final TopicSchema topicSchema;
+
+ @Getter
+ private ProducerBuilderImpl<?> producerBuilder;
+
+ @Getter
+ private Map<String, Producer<?>> publishProducers;
+
+ @Getter
+ private ThreadLocal<Map<String, Producer<?>>> tlPublishProducers;
+
+ public PulsarCluster(PulsarClient client, ProducerSpec producerSpec) {
+ this.client = client;
+ this.topicSchema = new TopicSchema(client);
+ this.producerBuilder = (ProducerBuilderImpl<?>) client.newProducer().blockIfQueueFull(true).enableBatching(true)
+ .batchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
+ boolean useThreadLocalProducers = false;
+ if (producerSpec != null) {
+ if (producerSpec.getMaxPendingMessages() != 0) {
+ this.producerBuilder.maxPendingMessages(producerSpec.getMaxPendingMessages());
+ }
+ if (producerSpec.getMaxPendingMessagesAcrossPartitions() != 0) {
+ this.producerBuilder.maxPendingMessagesAcrossPartitions(producerSpec.getMaxPendingMessagesAcrossPartitions());
+ }
+ if (producerSpec.getBatchBuilder() != null) {
+ if (producerSpec.getBatchBuilder().equals("KEY_BASED")) {
+ this.producerBuilder.batcherBuilder(BatcherBuilder.KEY_BASED);
+ } else {
+ this.producerBuilder.batcherBuilder(BatcherBuilder.DEFAULT);
+ }
+ }
+ useThreadLocalProducers = producerSpec.getUseThreadLocalProducers();
+ }
+ if (useThreadLocalProducers) {
+ tlPublishProducers = new ThreadLocal<>();
+ } else {
+ publishProducers = new HashMap<>();
+ }
+ }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java
new file mode 100644
index 0000000..962322c
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishExternalFunction.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+/**
+ * Example function that uses the built in publish function in the context
+ * to publish to a desired cluster & topic based on config.
+ */
+public class PublishExternalFunction implements Function<String, Void> {
+ @Override
+ public Void process(String input, Context context) {
+ // Ensure the desired cluster `external` config is provided when creating the function
+ String externalCluster = "external";
+ String publishTopic = (String) context.getUserConfigValueOrDefault("external-topic", "default-external-topic");
+ String output = String.format("%s!", input);
+ try {
+ context.newOutputMessage(externalCluster, publishTopic, Schema.STRING).value(output).sendAsync();
+ } catch (PulsarClientException e) {
+ context.getLogger().error(e.toString());
+ }
+ return null;
+ }
+}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index cd24bd0..5b75c95 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -32,14 +32,13 @@ import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
-import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.NameAndConfigBasedSecretsProviderConfigurator;
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 4b840a3..77396eb 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -88,6 +88,7 @@ message FunctionDetails {
bool retainOrdering = 21;
bool retainKeyOrdering = 22;
SubscriptionPosition subscriptionPosition = 23;
+ string externalPulsarsMap = 24;
}
message ConsumerSpec {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
index 5a0bd79..929affd 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import java.util.Optional;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
index 00a8ee9..08ea6ab 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.common.util.Reflections;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
index ae5fec1..457a26a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.functions.auth;
import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1DeleteOptions;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1Secret;
@@ -33,7 +32,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index a1b88a4..6b34221 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -42,7 +42,7 @@ import io.prometheus.jmx.JmxCollector;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 51e342e..56fb701 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -20,7 +20,7 @@
package org.apache.pulsar.functions.runtime;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 89827b9..242b46e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -35,7 +35,7 @@ import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a0bf855..8777b83 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -56,7 +56,7 @@ import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index e35a44d..f001af4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 876b91a..0b75f3d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -30,7 +30,7 @@ import io.grpc.ManagedChannelBuilder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -50,9 +50,6 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
/**
* A function container implemented using java thread.
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 6892f8f..0606afc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -27,12 +27,11 @@ import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
-import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import org.apache.pulsar.functions.worker.WorkerConfig;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 5cc74c5..d053b20 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -106,7 +106,6 @@ public class ThreadRuntime implements Runtime {
secretsProvider,
collectorRegistry,
narExtractionDirectory);
-
log.info("ThreadContainer starting function with instance config {}", instanceConfig);
this.fnThread = new Thread(threadGroup, javaInstanceRunnable,
String.format("%s-%s",
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 5a60f64..7601dfa 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -24,13 +24,13 @@ import io.prometheus.client.CollectorRegistry;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
@@ -43,8 +43,6 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import java.util.Optional;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
/**
* Thread based function container factory implementation.
*/
@@ -72,7 +70,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader) throws Exception {
- initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
+ initialize(threadGroupName, InstanceUtils.createPulsarClient(pulsarServiceUrl, authConfig),
storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
}
@@ -85,28 +83,6 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
}
- private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
- throws PulsarClientException {
- ClientBuilder clientBuilder = null;
- if (isNotBlank(pulsarServiceUrl)) {
- clientBuilder = PulsarClient.builder().serviceUrl(pulsarServiceUrl);
- if (authConfig != null) {
- if (isNotBlank(authConfig.getClientAuthenticationPlugin())
- && isNotBlank(authConfig.getClientAuthenticationParameters())) {
- clientBuilder.authentication(authConfig.getClientAuthenticationPlugin(),
- authConfig.getClientAuthenticationParameters());
- }
- clientBuilder.enableTls(authConfig.isUseTls());
- clientBuilder.allowTlsInsecureConnection(authConfig.isTlsAllowInsecureConnection());
- clientBuilder.enableTlsHostnameVerification(authConfig.isTlsHostnameVerificationEnable());
- clientBuilder.tlsTrustCertsFilePath(authConfig.getTlsTrustCertsFilePath());
- }
- clientBuilder.ioThreads(Runtime.getRuntime().availableProcessors());
- return clientBuilder.build();
- }
- return null;
- }
-
private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader) {
@@ -134,7 +110,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
initialize(factoryConfig.getThreadGroupName(),
- createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
+ InstanceUtils.createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null);
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
index 924653f..3f3afdd 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
index 38dd914..d7b2b45 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -38,7 +38,7 @@ import java.util.Optional;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.testng.Assert;
import org.testng.annotations.Test;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 8e207db..62d71c0 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -32,7 +32,7 @@ import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index b42292f..6b9b762 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -212,24 +212,7 @@ public class FunctionConfigUtils {
sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
}
if (functionConfig.getProducerConfig() != null) {
- ProducerConfig producerConf = functionConfig.getProducerConfig();
- Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
- if (producerConf.getMaxPendingMessages() != null) {
- pbldr.setMaxPendingMessages(producerConf.getMaxPendingMessages());
- }
- if (producerConf.getMaxPendingMessagesAcrossPartitions() != null) {
- pbldr.setMaxPendingMessagesAcrossPartitions(producerConf.getMaxPendingMessagesAcrossPartitions());
- }
- if (producerConf.getUseThreadLocalProducers() != null) {
- pbldr.setUseThreadLocalProducers(producerConf.getUseThreadLocalProducers());
- }
- if (producerConf.getCryptoConfig() != null) {
- pbldr.setCryptoSpec(CryptoUtils.convert(producerConf.getCryptoConfig()));
- }
- if (producerConf.getBatchBuilder() != null) {
- pbldr.setBatchBuilder(producerConf.getBatchBuilder());
- }
- sinkSpecBuilder.setProducerSpec(pbldr.build());
+ sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(functionConfig.getProducerConfig()));
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -294,6 +277,10 @@ public class FunctionConfigUtils {
functionDetailsBuilder.setSecretsMap(new Gson().toJson(functionConfig.getSecrets()));
}
+ if (functionConfig.getExternalPulsars() != null && !functionConfig.getExternalPulsars().isEmpty()) {
+ functionDetailsBuilder.setExternalPulsarsMap(new Gson().toJson(functionConfig.getExternalPulsars()));
+ }
+
if (functionConfig.getAutoAck() != null) {
functionDetailsBuilder.setAutoAck(functionConfig.getAutoAck());
} else {
@@ -380,22 +367,7 @@ public class FunctionConfigUtils {
functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
}
if (functionDetails.getSink().getProducerSpec() != null) {
- Function.ProducerSpec spec = functionDetails.getSink().getProducerSpec();
- ProducerConfig producerConfig = new ProducerConfig();
- if (spec.getMaxPendingMessages() != 0) {
- producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
- }
- if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
- producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
- }
- if (spec.hasCryptoSpec()) {
- producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
- }
- if (spec.getBatchBuilder() != null) {
- producerConfig.setBatchBuilder(spec.getBatchBuilder());
- }
- producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
- functionConfig.setProducerConfig(producerConfig);
+ functionConfig.setProducerConfig(ProducerConfigUtils.convertFromSpec(functionDetails.getSink().getProducerSpec()));
}
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
@@ -437,6 +409,13 @@ public class FunctionConfigUtils {
functionConfig.setSecrets(secretsMap);
}
+ if (isNotEmpty(functionDetails.getExternalPulsarsMap())) {
+ Type type = new TypeToken<Map<String, ExternalPulsarConfig>>() {
+ }.getType();
+ Map<String, ExternalPulsarConfig> externalPulsarsMap = new Gson().fromJson(functionDetails.getExternalPulsarsMap(), type);
+ functionConfig.setExternalPulsars(externalPulsarsMap);
+ }
+
if (functionDetails.hasResources()) {
Resources resources = new Resources();
resources.setCpu(functionDetails.getResources().getCpu());
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java
new file mode 100644
index 0000000..8e73ff8
--- /dev/null
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ProducerConfigUtils.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.utils;
+
+import org.apache.pulsar.common.functions.ProducerConfig;
+import org.apache.pulsar.functions.proto.Function;
+
+public class ProducerConfigUtils {
+ public static Function.ProducerSpec convert(ProducerConfig conf) {
+ Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
+ if (conf.getMaxPendingMessages() != null) {
+ pbldr.setMaxPendingMessages(conf.getMaxPendingMessages());
+ }
+ if (conf.getMaxPendingMessagesAcrossPartitions() != null) {
+ pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions());
+ }
+ if (conf.getUseThreadLocalProducers() != null) {
+ pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers());
+ }
+ if (conf.getBatchBuilder() != null) {
+ pbldr.setBatchBuilder(conf.getBatchBuilder());
+ }
+
+ return pbldr.build();
+ }
+
+ public static ProducerConfig convertFromSpec(Function.ProducerSpec spec) {
+ ProducerConfig producerConfig = new ProducerConfig();
+ if (spec.getMaxPendingMessages() != 0) {
+ producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
+ }
+ if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
+ producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
+ }
+ if (spec.getBatchBuilder() != null) {
+ producerConfig.setBatchBuilder(spec.getBatchBuilder());
+ }
+ producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
+ return producerConfig;
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 04f524a..08b561b 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -28,7 +28,6 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.functions.ProducerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.BatchSourceConfig;
import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -54,7 +53,6 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.functions.utils.FunctionCommon.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.FunctionCommon.getSourceType;
@@ -149,24 +147,7 @@ public class SourceConfigUtils {
}
if (sourceConfig.getProducerConfig() != null) {
- ProducerConfig conf = sourceConfig.getProducerConfig();
- Function.ProducerSpec.Builder pbldr = Function.ProducerSpec.newBuilder();
- if (conf.getMaxPendingMessages() != null) {
- pbldr.setMaxPendingMessages(conf.getMaxPendingMessages());
- }
- if (conf.getMaxPendingMessagesAcrossPartitions() != null) {
- pbldr.setMaxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions());
- }
- if (conf.getUseThreadLocalProducers() != null) {
- pbldr.setUseThreadLocalProducers(conf.getUseThreadLocalProducers());
- }
- if (conf.getCryptoConfig() != null) {
- pbldr.setCryptoSpec(CryptoUtils.convert(conf.getCryptoConfig()));
- }
- if (conf.getBatchBuilder() != null) {
- pbldr.setBatchBuilder(conf.getBatchBuilder());
- }
- sinkSpecBuilder.setProducerSpec(pbldr.build());
+ sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
}
functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -239,22 +220,7 @@ public class SourceConfigUtils {
sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
}
if (sinkSpec.getProducerSpec() != null) {
- Function.ProducerSpec spec = sinkSpec.getProducerSpec();
- ProducerConfig producerConfig = new ProducerConfig();
- if (spec.getMaxPendingMessages() != 0) {
- producerConfig.setMaxPendingMessages(spec.getMaxPendingMessages());
- }
- if (spec.getMaxPendingMessagesAcrossPartitions() != 0) {
- producerConfig.setMaxPendingMessagesAcrossPartitions(spec.getMaxPendingMessagesAcrossPartitions());
- }
- if (spec.hasCryptoSpec()) {
- producerConfig.setCryptoConfig(CryptoUtils.convertFromSpec(spec.getCryptoSpec()));
- }
- if (spec.getBatchBuilder() != null) {
- producerConfig.setBatchBuilder(spec.getBatchBuilder());
- }
- producerConfig.setUseThreadLocalProducers(spec.getUseThreadLocalProducers());
- sourceConfig.setProducerConfig(producerConfig);
+ sourceConfig.setProducerConfig(ProducerConfigUtils.convertFromSpec(sinkSpec.getProducerSpec()));
}
if (functionDetails.hasResources()) {
Resources resources = new Resources();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e359e83..bb07115 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -35,7 +35,7 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.runtime.RuntimeCustomizer;