You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/13 07:10:57 UTC
[pulsar] branch master updated: Set key for message when using
function publish (#4005)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 23b1418 Set key for message when using function publish (#4005)
23b1418 is described below
commit 23b1418f719ba21a164ebca9a86d52e3dca300cb
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 13 00:10:52 2019 -0700
Set key for message when using function publish (#4005)
* Allow to configure TypedMessageBuilder through a Map conf object
* Use constants for message confs
* Reverted previous change
* Use Long instead of Number
* Set key for message when using function publish
* fix unit test
* fix python test
* improving impl
* improving implementation
* add tests and examples
* fix bug
* fix bug
* fixing comments
* fix example
* addressing comments
* fix function
---
.../worker/PulsarFunctionPublishTest.java | 385 +++++++++++++++++++++
.../python/pulsar/functions/context.py | 30 +-
.../org/apache/pulsar/functions/api/Context.java | 47 ++-
.../pulsar/functions/instance/ContextImpl.java | 19 +-
.../instance/src/main/python/contextimpl.py | 14 +-
.../pulsar/functions/instance/ContextImplTest.java | 18 +
.../src/test/python/test_python_instance.py | 1 +
.../examples/PublishFunctionWithMessageConf.java | 52 +++
.../publish_function_with_message_conf.py | 38 ++
.../integration/functions/PulsarFunctionsTest.java | 160 ++++++++-
.../functions/PulsarFunctionsTestBase.java | 8 +
11 files changed, 743 insertions(+), 29 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
new file mode 100644
index 0000000..ef88fdd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+/**
+ * Test Pulsar function state
+ *
+ */
+public class PulsarFunctionPublishTest {
+ LocalBookkeeperEnsemble bkEnsemble;
+
+ ServiceConfiguration config;
+ WorkerConfig workerConfig;
+ URL urlTls;
+ PulsarService pulsar;
+ PulsarAdmin admin;
+ PulsarClient pulsarClient;
+ BrokerStats brokerStatsClient;
+ WorkerService functionsWorkerService;
+ final String tenant = "external-repl-prop";
+ String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+ String primaryHost;
+ String workerId;
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final List<Integer> bookiePorts = new LinkedList<>();
+ private final int brokerWebServicePort = PortManager.nextFreePort();
+ private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
+ private final int brokerServicePort = PortManager.nextFreePort();
+ private final int brokerServiceTlsPort = PortManager.nextFreePort();
+ private final int workerServicePort = PortManager.nextFreePort();
+
+ private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+ private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+ private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+ private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+ private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarFunctionStateTest.class);
+
+ @DataProvider(name = "validRoleName")
+ public Object[][] validRoleName() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ @BeforeMethod
+ void setup(Method method) throws Exception {
+
+ // delete all function temp files
+ File dir = new File(System.getProperty("java.io.tmpdir"));
+ File[] foundFiles = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith("function");
+ }
+ });
+
+ for (File file : foundFiles) {
+ file.delete();
+ }
+
+ log.info("--- Setting up method {} ---", method.getName());
+
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> {
+ int port = PortManager.nextFreePort();
+ bookiePorts.add(port);
+ return port;
+ });
+ bkEnsemble.start();
+
+ String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
+
+ config = spy(new ServiceConfiguration());
+ config.setClusterName("use");
+ Set<String> superUsers = Sets.newHashSet("superUser");
+ config.setSuperUserRoles(superUsers);
+ config.setWebServicePort(brokerWebServicePort);
+ config.setWebServicePortTls(brokerWebServiceTlsPort);
+ config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config.setBrokerServicePort(brokerServicePort);
+ config.setBrokerServicePortTls(brokerServiceTlsPort);
+ config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+ config.setTlsAllowInsecureConnection(true);
+ config.setAdvertisedAddress("localhost");
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderTls.class.getName());
+ config.setAuthenticationEnabled(true);
+ config.setAuthenticationProviders(providers);
+
+ config.setAuthorizationEnabled(true);
+ config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
+ config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+ config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+ config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+ config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ config.setBrokerClientAuthenticationParameters(
+ "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+ config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+ config.setBrokerClientTlsEnabled(true);
+
+
+
+ functionsWorkerService = createPulsarFunctionWorker(config);
+ urlTls = new URL(brokerServiceUrl);
+ Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
+ pulsar = new PulsarService(config, functionWorkerService);
+ pulsar.start();
+
+ Map<String, String> authParams = new HashMap<>();
+ authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+ authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+ Authentication authTls = new AuthenticationTls();
+ authTls.configure(authParams);
+
+ admin = spy(
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+ .allowTlsInsecureConnection(true).authentication(authTls).build());
+
+ brokerStatsClient = admin.brokerStats();
+ primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+ // update cluster metadata
+ ClusterData clusterData = new ClusterData(urlTls.toString());
+ admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+ if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+ clientBuilder.enableTls(workerConfig.isUseTls());
+ clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+ clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+ workerConfig.getClientAuthenticationParameters());
+ }
+ pulsarClient = clientBuilder.build();
+
+ TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add("superUser");
+ propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+ admin.tenants().updateTenant(tenant, propAdmin);
+
+ System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
+ }
+
+ @AfterMethod
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ pulsarClient.close();
+ admin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ }
+
+ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
+ workerConfig = new WorkerConfig();
+ workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+ workerConfig.setSchedulerClassName(
+ org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+ workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+// workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory()
+// .setJavaInstanceJarLocation("/Users/jerrypeng/workspace/incubator-pulsar/pulsar-functions/runtime-all/target/java-instance.jar")
+// .setPythonInstanceLocation(""));
+ // worker talks to local broker
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
+ workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
+ workerConfig.setFailureCheckFreqMs(100);
+ workerConfig.setNumFunctionPackageReplicas(1);
+ workerConfig.setClusterCoordinationTopicName("coordinate");
+ workerConfig.setFunctionAssignmentTopicName("assignment");
+ workerConfig.setFunctionMetadataTopicName("metadata");
+ workerConfig.setInstanceLivenessCheckFreqMs(100);
+ workerConfig.setWorkerPort(workerServicePort);
+ workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+ String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+ this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+ workerConfig.setWorkerHostname(hostname);
+ workerConfig.setWorkerId(workerId);
+
+ workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+ workerConfig.setClientAuthenticationParameters(
+ String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+ workerConfig.setUseTls(true);
+ workerConfig.setTlsAllowInsecureConnection(true);
+ workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+ workerConfig.setAuthenticationEnabled(true);
+ workerConfig.setAuthorizationEnabled(true);
+
+ return new WorkerService(workerConfig);
+ }
+
+ protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String publishTopic, String subscriptionName) {
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ functionConfig.setParallelism(1);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ functionConfig.setSubName(subscriptionName);
+ functionConfig.setInputs(Collections.singleton(sourceTopic));
+ functionConfig.setAutoAck(true);
+ functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf");
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ Map<String, Object> userConfig = new HashMap<>();
+ userConfig.put("publish-topic", publishTopic);
+ functionConfig.setUserConfig(userConfig); functionConfig.setCleanupSubscription(true);
+ return functionConfig;
+ }
+
+ @Test(timeOut = 20000)
+ public void testPulsarFunctionState() throws Exception {
+
+ final String namespacePortion = "io";
+ final String replNamespace = tenant + "/" + namespacePortion;
+ final String sourceTopic = "persistent://" + replNamespace + "/input";
+ final String publishTopic = "persistent://" + replNamespace + "/publishtopic";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ final String subscriptionName = "test-sub";
+ admin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ // create a producer that creates a topic at broker
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(publishTopic).subscriptionName("sub").subscribe();
+
+ FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+ sourceTopic, publishTopic, subscriptionName);
+
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+ admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ int totalMsgs = 5;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "foo";
+ producer.newMessage().property(propertyKey, propertyValue).key(String.valueOf(i)).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ retryStrategically((test) -> {
+ try {
+ FunctionStats functionStat = admin.functions().getFunctionStats(tenant, namespacePortion, functionName);
+ return functionStat.getProcessedSuccessfullyTotal() == 5;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ assertEquals(propertyValue, receivedPropertyValue);
+ assertEquals(msg.getProperty("input_topic"), sourceTopic);
+ assertEquals(msg.getKey(), String.valueOf(i));
+ }
+
+ // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
+ // due to publish failure
+ assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
+
+ // delete functions
+ admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+ // make sure all temp files are deleted
+ File dir = new File(System.getProperty("java.io.tmpdir"));
+ File[] foundFiles = dir.listFiles(new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith("function");
+ }
+ });
+
+ Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+ }
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 169ec76..14b277a 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -119,14 +119,42 @@ class Context(object):
pass
@abstractmethod
+ def get_partition_key(self):
+ """Returns partition key of the input message is one exists"""
+ pass
+
+
+ @abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass
@abstractmethod
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
+ """
+
+ DEPRECATED
+
+ Publishes message to topic_name by first serializing the message using serde_class_name serde
+ The message will have properties specified if any
+ """
+ pass
+
+ @abstractmethod
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
- The message will have properties specified if any"""
+ The message will have properties specified if any
+
+ The available options for message_conf:
+
+ properties,
+ partition_key,
+ sequence_id,
+ replication_clusters,
+ disable_replication,
+ event_timestamp
+
+ """
pass
@abstractmethod
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 17f989e..556086a 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -84,6 +84,7 @@ public interface Context {
/**
* The id of the function that we are executing
+ *
* @return The function id
*/
String getFunctionId();
@@ -119,16 +120,17 @@ public interface Context {
/**
* Increment the builtin distributed counter referred by key.
*
- * @param key The name of the key
+ * @param key The name of the key
* @param amount The amount to be incremented
*/
void incrCounter(String key, long amount);
+
/**
* Increment the builtin distributed counter referred by key
* but dont wait for the completion of the increment operation
*
- * @param key The name of the key
+ * @param key The name of the key
* @param amount The amount to be incremented
*/
CompletableFuture<Void> incrCounterAsync(String key, long amount);
@@ -153,7 +155,7 @@ public interface Context {
/**
* Update the state value for the key.
*
- * @param key name of the key
+ * @param key name of the key
* @param value state value of the key
*/
void putState(String key, ByteBuffer value);
@@ -161,7 +163,7 @@ public interface Context {
/**
* Update the state value for the key, but don't wait for the operation to be completed
*
- * @param key name of the key
+ * @param key name of the key
* @param value state value of the key
*/
CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
@@ -218,19 +220,17 @@ public interface Context {
* Record a user defined metric.
*
* @param metricName The name of the metric
- * @param value The value of the metric
+ * @param value The value of the metric
*/
void recordMetric(String metricName, double value);
/**
- * Publish an object using serDe for serializing to the topic.
+ * Publish an object using serDe or schema class for serializing to the topic.
*
- * @param topicName
- * The name of the topic for publishing
- * @param object
- * The object that needs to be published
- * @param schemaOrSerdeClassName
- * Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class
+ * @param topicName The name of the topic for publishing
+ * @param object The object that needs to be published
+ * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
+ * of the custom schema class
* @return A future that completes when the framework is done publishing the message
*/
<O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
@@ -239,9 +239,30 @@ public interface Context {
* Publish an object to the topic using default schemas.
*
* @param topicName The name of the topic for publishing
- * @param object The object that needs to be published
+ * @param object The object that needs to be published
* @return A future that completes when the framework is done publishing the message
*/
<O> CompletableFuture<Void> publish(String topicName, O object);
+ /**
+ * Publish an object using serDe or schema class for serializing to the topic.
+ *
+ * @param topicName The name of the topic for publishing
+ * @param object The object that needs to be published
+ * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
+ * of the custom schema class
+ * @param messageConf A map of configurations to set for the message that will be published
+ * The available options are:
+ *
+ * "key" - Parition Key
+ * "properties" - Map of properties
+ * "eventTime"
+ * "sequenceId"
+ * "replicationClusters"
+ * "disableReplication"
+ *
+ * @return A future that completes when the framework is done publishing the message
+ */
+ <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf);
+
}
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index dc99f60..5c06b49 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.functions.api.Context;
@@ -258,7 +259,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
return null;
}
}
-
+
private void ensureStateEnabled() {
checkState(null != stateContext, "State is not enabled.");
}
@@ -336,11 +337,17 @@ class ContextImpl implements Context, SinkContext, SourceContext {
@SuppressWarnings("unchecked")
@Override
public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
- return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
+ return publish(topicName, object, schemaOrSerdeClassName, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf) {
+ return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false), messageConf);
}
@SuppressWarnings("unchecked")
- public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
+ public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema, Map<String, Object> messageConf) {
Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
if (producer == null) {
@@ -382,7 +389,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
}
}
- CompletableFuture<Void> future = producer.sendAsync(object).thenApply(msgId -> null);
+ TypedMessageBuilder<O> messageBuilder = producer.newMessage();
+ if (messageConf != null) {
+ messageBuilder.loadConf(messageConf);
+ }
+ CompletableFuture<Void> future = messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
future.exceptionally(e -> {
this.statsManager.incrSysExceptions(e);
logger.error("Failed to publish to topic {} with error {}", topicName, e);
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index f19381b..fbba3d5 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -90,6 +90,9 @@ class ContextImpl(pulsar.Context):
def get_current_message_topic_name(self):
return self.message.topic_name()
+ def get_partition_key(self):
+ return self.message.partition_key()
+
def get_function_name(self):
return self.instance_config.function_details.name
@@ -147,6 +150,9 @@ class ContextImpl(pulsar.Context):
callback(result, msg)
def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
+ self.publish(topic_name, message, serde_class_name=serde_class_name, compression_type=compression_type, callback=callback, message_conf={"properties": properties})
+
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
@@ -172,7 +178,13 @@ class ContextImpl(pulsar.Context):
self.publish_serializers[serde_class_name] = serde_klass()
output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
- self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties)
+
+ if message_conf:
+ self.publish_producers[topic_name].send_async(
+ output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), **message_conf)
+ else:
+ self.publish_producers[topic_name].send_async(
+ output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()))
def ack(self, msgid, topic):
topic_consumer = None
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index fd541f3..9d2579c 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.functions.instance;
import io.prometheus.client.CollectorRegistry;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
@@ -38,10 +42,14 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -72,12 +80,22 @@ public class ContextImplTest {
when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
+ TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
+ doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
+ when(producer.newMessage()).thenReturn(messageBuilder);
+
context = new ContextImpl(
config,
logger,
client,
new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
ComponentType.FUNCTION, null);
+ context.setCurrentMessageContext(new Record<String>() {
+ @Override
+ public String getValue() {
+ return null;
+ }
+ });
}
@Test(expectedExceptions = IllegalStateException.class)
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 8b92fa8..bbd6ca1 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -62,6 +62,7 @@ class TestContextImpl(unittest.TestCase):
msg = Message()
msg.message_id = Mock(return_value="test_message_id")
+ msg.partition_key = Mock(return_value="test_key")
context_impl.set_current_message_context(msg, "test_topic_name")
context_impl.publish("test_topic_name", "test_message")
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
new file mode 100644
index 0000000..9960552
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Example function that uses the built in publish function in the context
+ * to publish to a desired topic based on config and setting various message configurations to be passed along.
+ *
+ */
+public class PublishFunctionWithMessageConf implements Function<String, Void> {
+ @Override
+ public Void process(String input, Context context) {
+ String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
+ String output = String.format("%s!", input);
+
+ Map<String, String> properties = new HashMap<>();
+ properties.put("input_topic", context.getCurrentRecord().getTopicName().get());
+ properties.putAll(context.getCurrentRecord().getProperties());
+
+ Map<String, Object> messageConf = new HashMap<>();
+ messageConf.put(TypedMessageBuilder.CONF_PROPERTIES, properties);
+ if (context.getCurrentRecord().getKey().isPresent()) {
+ messageConf.put(TypedMessageBuilder.CONF_KEY, context.getCurrentRecord().getKey().get());
+ }
+ messageConf.put(TypedMessageBuilder.CONF_EVENT_TIME, System.currentTimeMillis());
+ context.publish(publishTopic, output, null, messageConf);
+ return null;
+ }
+}
diff --git a/pulsar-functions/python-examples/publish_function_with_message_conf.py b/pulsar-functions/python-examples/publish_function_with_message_conf.py
new file mode 100644
index 0000000..79aac02
--- /dev/null
+++ b/pulsar-functions/python-examples/publish_function_with_message_conf.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from pulsar import Function
+
+# Example function that uses the built in publish function in the context
+# to publish to a desired topic based on config
+class PublishFunctionWithMessageConf(Function):
+ def __init__(self):
+ pass
+
+ def process(self, input, context):
+ publish_topic = "publishtopic"
+ if "publish-topic" in context.get_user_config_map():
+ publish_topic = context.get_user_config_value("publish-topic")
+ context.publish(publish_topic, input + '!',
+ message_conf={"properties": {k: v for d in [{"input_topic" : context.get_current_message_topic_name()}, context.get_message_properties()] for k, v in d.items()},
+ "partition_key": context.get_partition_key(),
+ "event_timestamp": int(time.time())})
+ return
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index a60eb47..bb4c6b4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.fail;
import com.google.common.base.Stopwatch;
import com.google.gson.Gson;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -725,6 +726,108 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
//
@Test
+ public void testPythonPublishFunction() throws Exception {
+ testPublishFunction(Runtime.PYTHON);
+ }
+
+ @Test
+ public void testJavaPublishFunction() throws Exception {
+ testPublishFunction(Runtime.JAVA);
+ }
+
+ private void testPublishFunction(Runtime runtime) throws Exception {
+ if (functionRuntimeType == FunctionRuntimeType.THREAD) {
+ return;
+ }
+
+ Schema<?> schema;
+ if (Runtime.JAVA == runtime) {
+ schema = Schema.STRING;
+ } else {
+ schema = Schema.BYTES;
+ }
+
+ String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8);
+ String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8);
+
+ String functionName = "test-publish-fn-" + randomName(8);
+ final int numMessages = 10;
+
+ // submit the exclamation function
+
+ if (runtime == Runtime.PYTHON) {
+ submitFunction(
+ runtime, inputTopicName, outputTopicName, functionName, PUBLISH_FUNCTION_PYTHON_FILE, PUBLISH_PYTHON_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+ } else {
+ submitFunction(
+ runtime, inputTopicName, outputTopicName, functionName, null, PUBLISH_JAVA_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+ }
+
+ // get function info
+ getFunctionInfoSuccess(functionName);
+
+ // get function stats
+ getFunctionStatsEmpty(functionName);
+
+ // publish and consume result
+ if (Runtime.JAVA == runtime) {
+ // java supports schema
+ publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+ } else {
+ // python doesn't support schema
+
+ @Cleanup PulsarClient client = PulsarClient.builder()
+ .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+ .build();
+ @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+ .topic(outputTopicName)
+ .subscriptionType(SubscriptionType.Exclusive)
+ .subscriptionName("test-sub")
+ .subscribe();
+
+ @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+ .topic(inputTopicName)
+ .create();
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().key(String.valueOf(i)).property("count", String.valueOf(i)).value(("message-" + i).getBytes(UTF_8)).send();
+ }
+
+ Set<String> expectedMessages = new HashSet<>();
+ for (int i = 0; i < numMessages; i++) {
+ expectedMessages.add("message-" + i + "!");
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+ String msgValue = new String(msg.getValue(), UTF_8);
+ log.info("Received: {}", msgValue);
+ assertEquals(msg.getKey(), String.valueOf(i));
+ assertEquals(msg.getProperties().get("count"), String.valueOf(i));
+ assertEquals(msg.getProperties().get("input_topic"), inputTopicName);
+ assertTrue(msg.getEventTime() > 0);
+ assertTrue(expectedMessages.contains(msgValue));
+ expectedMessages.remove(msgValue);
+ }
+ }
+
+ // get function status
+ getFunctionStatus(functionName, numMessages, true);
+
+ // get function stats
+ getFunctionStats(functionName, numMessages);
+
+ // delete function
+ deleteFunction(functionName);
+
+ // get function info
+ getFunctionInfoNotFound(functionName);
+
+ // make sure subscriptions are cleanup
+ checkSubscriptionsCleanup(inputTopicName);
+ }
+
+ @Test
public void testPythonExclamationFunction() throws Exception {
testExclamationFunction(Runtime.PYTHON, false, false, false);
}
@@ -841,6 +944,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
functionName,
pyZip,
withExtraDeps,
+ false,
getExclamationClass(runtime, pyZip, withExtraDeps),
schema);
}
@@ -851,8 +955,47 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
String functionName,
boolean pyZip,
boolean withExtraDeps,
+ boolean isPublishFunction,
+ String functionClass,
+ Schema<T> inputTopicSchema) throws Exception {
+
+ String file = null;
+ if (Runtime.JAVA == runtime) {
+ file = null;
+ } else if (Runtime.PYTHON == runtime) {
+ if (isPublishFunction) {
+ file = PUBLISH_FUNCTION_PYTHON_FILE;
+ } else if (pyZip) {
+ file = EXCLAMATION_PYTHONZIP_FILE;
+ } else if (withExtraDeps) {
+ file = EXCLAMATION_WITH_DEPS_PYTHON_FILE;
+ } else {
+ file = EXCLAMATION_PYTHON_FILE;
+ }
+ }
+
+ submitFunction(runtime, inputTopicName, outputTopicName, functionName, file, functionClass, inputTopicSchema);
+ }
+
+ private static <T> void submitFunction(Runtime runtime,
+ String inputTopicName,
+ String outputTopicName,
+ String functionName,
+ String functionFile,
String functionClass,
Schema<T> inputTopicSchema) throws Exception {
+ submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass, inputTopicSchema, null);
+ }
+
+ private static <T> void submitFunction(Runtime runtime,
+ String inputTopicName,
+ String outputTopicName,
+ String functionName,
+ String functionFile,
+ String functionClass,
+ Schema<T> inputTopicSchema,
+ Map<String, String> userConfigs) throws Exception {
+
CommandGenerator generator;
log.info("------- INPUT TOPIC: '{}'", inputTopicName);
if (inputTopicName.endsWith(".*")) {
@@ -864,28 +1007,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
}
generator.setSinkTopic(outputTopicName);
generator.setFunctionName(functionName);
+ if (userConfigs != null) {
+ generator.setUserConfig(userConfigs);
+ }
String command;
if (Runtime.JAVA == runtime) {
command = generator.generateCreateFunctionCommand();
} else if (Runtime.PYTHON == runtime) {
generator.setRuntime(runtime);
- if (pyZip) {
- command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE);
- } else if (withExtraDeps) {
- command = generator.generateCreateFunctionCommand(EXCLAMATION_WITH_DEPS_PYTHON_FILE);
- } else {
- command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
- }
+ command = generator.generateCreateFunctionCommand(functionFile);
} else {
throw new IllegalArgumentException("Unsupported runtime : " + runtime);
}
log.info("---------- Function command: {}", command);
String[] commands = {
- "sh", "-c", command
+ "sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
- commands);
+ commands);
assertTrue(result.getStdout().contains("\"Created successfully\""));
ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
@@ -1181,7 +1321,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
// submit the exclamation function
submitFunction(
- Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false,
+ Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false, false,
AutoSchemaFunction.class.getName(),
Schema.AVRO(CustomObject.class));
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 851793c..e717389 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -73,6 +73,10 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
public static final String EXCLAMATION_JAVA_CLASS =
"org.apache.pulsar.functions.api.examples.ExclamationFunction";
+ public static final String PUBLISH_JAVA_CLASS =
+ "org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf";
+
+
public static final String EXCLAMATION_PYTHON_CLASS =
"exclamation_function.ExclamationFunction";
@@ -82,9 +86,13 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
public static final String EXCLAMATION_PYTHONZIP_CLASS =
"exclamation";
+ public static final String PUBLISH_PYTHON_CLASS = "publish_function_with_message_conf.PublishFunctionWithMessageConf";
+
public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
+ public static final String PUBLISH_FUNCTION_PYTHON_FILE = "publish_function_with_message_conf.py";
+
protected static String getExclamationClass(Runtime runtime,
boolean pyZip,