You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/04/13 07:10:57 UTC

[pulsar] branch master updated: Set key for message when using function publish (#4005)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 23b1418  Set key for message when using function publish (#4005)
23b1418 is described below

commit 23b1418f719ba21a164ebca9a86d52e3dca300cb
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sat Apr 13 00:10:52 2019 -0700

    Set key for message when using function publish (#4005)
    
    * Allow to configure TypedMessageBuilder through a Map conf object
    
    * Use constants for message confs
    
    * Reverted previous change
    
    * Use Long instead of Number
    
    * Set key for message when using function publish
    
    * fix unit test
    
    * fix python test
    
    * improving impl
    
    * improving implementation
    
    * add tests and examples
    
    * fix bug
    
    * fix bug
    
    * fixing comments
    
    * fix example
    
    * addressing comments
    
    * fix function
---
 .../worker/PulsarFunctionPublishTest.java          | 385 +++++++++++++++++++++
 .../python/pulsar/functions/context.py             |  30 +-
 .../org/apache/pulsar/functions/api/Context.java   |  47 ++-
 .../pulsar/functions/instance/ContextImpl.java     |  19 +-
 .../instance/src/main/python/contextimpl.py        |  14 +-
 .../pulsar/functions/instance/ContextImplTest.java |  18 +
 .../src/test/python/test_python_instance.py        |   1 +
 .../examples/PublishFunctionWithMessageConf.java   |  52 +++
 .../publish_function_with_message_conf.py          |  38 ++
 .../integration/functions/PulsarFunctionsTest.java | 160 ++++++++-
 .../functions/PulsarFunctionsTestBase.java         |   8 +
 11 files changed, 743 insertions(+), 29 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
new file mode 100644
index 0000000..ef88fdd
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -0,0 +1,385 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+
+/**
+ * Test Pulsar function state
+ *
+ */
+public class PulsarFunctionPublishTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    URL urlTls;
+    PulsarService pulsar;
+    PulsarAdmin admin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String primaryHost;
+    String workerId;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final List<Integer> bookiePorts = new LinkedList<>();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+    private final int brokerServiceTlsPort = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionStateTest.class);
+
+    @DataProvider(name = "validRoleName")
+    public Object[][] validRoleName() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        // delete all function temp files
+        File dir = new File(System.getProperty("java.io.tmpdir"));
+        File[] foundFiles = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.startsWith("function");
+            }
+        });
+
+        for (File file : foundFiles) {
+            file.delete();
+        }
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> {
+            int port = PortManager.nextFreePort();
+            bookiePorts.add(port);
+            return port;
+        });
+        bkEnsemble.start();
+
+        String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setWebServicePort(brokerWebServicePort);
+        config.setWebServicePortTls(brokerWebServiceTlsPort);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(brokerServicePort);
+        config.setBrokerServicePortTls(brokerServiceTlsPort);
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+        config.setTlsAllowInsecureConnection(true);
+        config.setAdvertisedAddress("localhost");
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+
+        config.setAuthorizationEnabled(true);
+        config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        config.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        config.setBrokerClientTlsEnabled(true);
+
+
+
+        functionsWorkerService = createPulsarFunctionWorker(config);
+        urlTls = new URL(brokerServiceUrl);
+        Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
+        pulsar = new PulsarService(config, functionWorkerService);
+        pulsar.start();
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        admin = spy(
+                PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                        .allowTlsInsecureConnection(true).authentication(authTls).build());
+
+        brokerStatsClient = admin.brokerStats();
+        primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(urlTls.toString());
+        admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+                && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+            clientBuilder.enableTls(workerConfig.isUseTls());
+            clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+            clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+                    workerConfig.getClientAuthenticationParameters());
+        }
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
+        propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsarClient.close();
+        admin.close();
+        functionsWorkerService.stop();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+//        workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory()
+//                .setJavaInstanceJarLocation("/Users/jerrypeng/workspace/incubator-pulsar/pulsar-functions/runtime-all/target/java-instance.jar")
+//                .setPythonInstanceLocation(""));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
+        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+
+        workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+        workerConfig.setUseTls(true);
+        workerConfig.setTlsAllowInsecureConnection(true);
+        workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
+
+        return new WorkerService(workerConfig);
+    }
+
+    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String publishTopic, String subscriptionName) {
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setInputs(Collections.singleton(sourceTopic));
+        functionConfig.setAutoAck(true);
+        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        Map<String, Object> userConfig = new HashMap<>();
+        userConfig.put("publish-topic", publishTopic);
+        functionConfig.setUserConfig(userConfig);        functionConfig.setCleanupSubscription(true);
+        return functionConfig;
+    }
+
+    @Test(timeOut = 20000)
+    public void testPulsarFunctionState() throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String publishTopic = "persistent://" + replNamespace + "/publishtopic";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(publishTopic).subscriptionName("sub").subscribe();
+
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+                sourceTopic, publishTopic, subscriptionName);
+
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+        int totalMsgs = 5;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "foo";
+            producer.newMessage().property(propertyKey, propertyValue).key(String.valueOf(i)).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        retryStrategically((test) -> {
+            try {
+                FunctionStats functionStat = admin.functions().getFunctionStats(tenant, namespacePortion, functionName);
+                return functionStat.getProcessedSuccessfullyTotal() == 5;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedPropertyValue = msg.getProperty(propertyKey);
+            assertEquals(propertyValue, receivedPropertyValue);
+            assertEquals(msg.getProperty("input_topic"), sourceTopic);
+            assertEquals(msg.getKey(), String.valueOf(i));
+        }
+
+        // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
+        // due to publish failure
+        assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+                totalMsgs);
+
+        // delete functions
+        admin.functions().deleteFunction(tenant, namespacePortion, functionName);
+
+        retryStrategically((test) -> {
+            try {
+                return admin.topics().getStats(sourceTopic).subscriptions.size() == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        // make sure subscriptions are cleanup
+        assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 0);
+
+        // make sure all temp files are deleted
+        File dir = new File(System.getProperty("java.io.tmpdir"));
+        File[] foundFiles = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.startsWith("function");
+            }
+        });
+
+        Assert.assertEquals(foundFiles.length, 0, "Temporary files left over: " + Arrays.asList(foundFiles));
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 169ec76..14b277a 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -119,14 +119,42 @@ class Context(object):
     pass
 
   @abstractmethod
+  def get_partition_key(self):
+    """Returns partition key of the input message is one exists"""
+    pass
+
+
+  @abstractmethod
   def record_metric(self, metric_name, metric_value):
     """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
     pass
 
   @abstractmethod
   def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
+    """
+
+    DEPRECATED
+
+    Publishes message to topic_name by first serializing the message using serde_class_name serde
+    The message will have properties specified if any
+    """
+    pass
+
+  @abstractmethod
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
     """Publishes message to topic_name by first serializing the message using serde_class_name serde
-    The message will have properties specified if any"""
+    The message will have properties specified if any
+
+    The available options for message_conf:
+
+      properties,
+      partition_key,
+      sequence_id,
+      replication_clusters,
+      disable_replication,
+      event_timestamp
+
+    """
     pass
 
   @abstractmethod
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 17f989e..556086a 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -84,6 +84,7 @@ public interface Context {
 
     /**
      * The id of the function that we are executing
+     *
      * @return The function id
      */
     String getFunctionId();
@@ -119,16 +120,17 @@ public interface Context {
     /**
      * Increment the builtin distributed counter referred by key.
      *
-     * @param key The name of the key
+     * @param key    The name of the key
      * @param amount The amount to be incremented
      */
     void incrCounter(String key, long amount);
 
+
     /**
      * Increment the builtin distributed counter referred by key
      * but dont wait for the completion of the increment operation
      *
-     * @param key The name of the key
+     * @param key    The name of the key
      * @param amount The amount to be incremented
      */
     CompletableFuture<Void> incrCounterAsync(String key, long amount);
@@ -153,7 +155,7 @@ public interface Context {
     /**
      * Update the state value for the key.
      *
-     * @param key name of the key
+     * @param key   name of the key
      * @param value state value of the key
      */
     void putState(String key, ByteBuffer value);
@@ -161,7 +163,7 @@ public interface Context {
     /**
      * Update the state value for the key, but don't wait for the operation to be completed
      *
-     * @param key name of the key
+     * @param key   name of the key
      * @param value state value of the key
      */
     CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
@@ -218,19 +220,17 @@ public interface Context {
      * Record a user defined metric.
      *
      * @param metricName The name of the metric
-     * @param value The value of the metric
+     * @param value      The value of the metric
      */
     void recordMetric(String metricName, double value);
 
     /**
-     * Publish an object using serDe for serializing to the topic.
+     * Publish an object using serDe or schema class for serializing to the topic.
      *
-     * @param topicName
-     *            The name of the topic for publishing
-     * @param object
-     *            The object that needs to be published
-     * @param schemaOrSerdeClassName
-     *            Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class
+     * @param topicName              The name of the topic for publishing
+     * @param object                 The object that needs to be published
+     * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
+     *                               of the custom schema class
      * @return A future that completes when the framework is done publishing the message
      */
     <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName);
@@ -239,9 +239,30 @@ public interface Context {
      * Publish an object to the topic using default schemas.
      *
      * @param topicName The name of the topic for publishing
-     * @param object The object that needs to be published
+     * @param object    The object that needs to be published
      * @return A future that completes when the framework is done publishing the message
      */
     <O> CompletableFuture<Void> publish(String topicName, O object);
 
+    /**
+     * Publish an object using serDe or schema class for serializing to the topic.
+     *
+     * @param topicName              The name of the topic for publishing
+     * @param object                 The object that needs to be published
+     * @param schemaOrSerdeClassName Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name
+     *                               of the custom schema class
+     * @param messageConf      A map of configurations to set for the message that will be published
+     *                         The available options are:
+     *
+     *                         "key" - Parition Key
+     *                         "properties" - Map of properties
+     *                         "eventTime"
+     *                         "sequenceId"
+     *                         "replicationClusters"
+     *                         "disableReplication"
+     *
+     * @return A future that completes when the framework is done publishing the message
+     */
+    <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf);
+
 }
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index dc99f60..5c06b49 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
@@ -258,7 +259,7 @@ class ContextImpl implements Context, SinkContext, SourceContext {
             return null;
         }
     }
-
+    
     private void ensureStateEnabled() {
         checkState(null != stateContext, "State is not enabled.");
     }
@@ -336,11 +337,17 @@ class ContextImpl implements Context, SinkContext, SourceContext {
     @SuppressWarnings("unchecked")
     @Override
     public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName) {
-        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false));
+        return publish(topicName, object, schemaOrSerdeClassName, null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName, Map<String, Object> messageConf) {
+        return publish(topicName, object, (Schema<O>) topicSchema.getSchema(topicName, object, schemaOrSerdeClassName, false), messageConf);
     }
 
     @SuppressWarnings("unchecked")
-    public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema) {
+    public <O> CompletableFuture<Void> publish(String topicName, O object, Schema<O> schema, Map<String, Object> messageConf) {
         Producer<O> producer = (Producer<O>) publishProducers.get(topicName);
 
         if (producer == null) {
@@ -382,7 +389,11 @@ class ContextImpl implements Context, SinkContext, SourceContext {
             }
         }
 
-        CompletableFuture<Void> future = producer.sendAsync(object).thenApply(msgId -> null);
+        TypedMessageBuilder<O> messageBuilder = producer.newMessage();
+        if (messageConf != null) {
+            messageBuilder.loadConf(messageConf);
+        }
+        CompletableFuture<Void> future = messageBuilder.value(object).sendAsync().thenApply(msgId -> null);
         future.exceptionally(e -> {
             this.statsManager.incrSysExceptions(e);
             logger.error("Failed to publish to topic {} with error {}", topicName, e);
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index f19381b..fbba3d5 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -90,6 +90,9 @@ class ContextImpl(pulsar.Context):
   def get_current_message_topic_name(self):
     return self.message.topic_name()
 
+  def get_partition_key(self):
+    return self.message.partition_key()
+
   def get_function_name(self):
     return self.instance_config.function_details.name
 
@@ -147,6 +150,9 @@ class ContextImpl(pulsar.Context):
       callback(result, msg)
 
   def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
+    self.publish(topic_name, message, serde_class_name=serde_class_name, compression_type=compression_type, callback=callback, message_conf={"properties": properties})
+
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
     serde_class_name = str(serde_class_name)
@@ -172,7 +178,13 @@ class ContextImpl(pulsar.Context):
       self.publish_serializers[serde_class_name] = serde_klass()
 
     output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
-    self.publish_producers[topic_name].send_async(output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), properties=properties)
+
+    if message_conf:
+      self.publish_producers[topic_name].send_async(
+        output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), **message_conf)
+    else:
+      self.publish_producers[topic_name].send_async(
+        output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()))
 
   def ack(self, msgid, topic):
     topic_consumer = None
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index fd541f3..9d2579c 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -21,9 +21,13 @@ package org.apache.pulsar.functions.instance;
 import io.prometheus.client.CollectorRegistry;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.client.impl.ProducerBase;
 import org.apache.pulsar.client.impl.ProducerBuilderImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.TypedMessageBuilderImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
@@ -38,10 +42,14 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -72,12 +80,22 @@ public class ContextImplTest {
         when(client.getSchema(anyString())).thenReturn(CompletableFuture.completedFuture(Optional.empty()));
         when(producer.sendAsync(anyString())).thenReturn(CompletableFuture.completedFuture(null));
 
+        TypedMessageBuilder messageBuilder = spy(new TypedMessageBuilderImpl(mock(ProducerBase.class), Schema.STRING));
+        doReturn(new CompletableFuture<>()).when(messageBuilder).sendAsync();
+        when(producer.newMessage()).thenReturn(messageBuilder);
+
         context = new ContextImpl(
             config,
             logger,
             client,
             new EnvironmentBasedSecretsProvider(), new CollectorRegistry(), new String[0],
                 ComponentType.FUNCTION, null);
+        context.setCurrentMessageContext(new Record<String>() {
+            @Override
+            public String getValue() {
+                return null;
+            }
+        });
     }
 
     @Test(expectedExceptions = IllegalStateException.class)
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 8b92fa8..bbd6ca1 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -62,6 +62,7 @@ class TestContextImpl(unittest.TestCase):
 
     msg = Message()
     msg.message_id = Mock(return_value="test_message_id")
+    msg.partition_key = Mock(return_value="test_key")
     context_impl.set_current_message_context(msg, "test_topic_name")
 
     context_impl.publish("test_topic_name", "test_message")
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
new file mode 100644
index 0000000..9960552
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/PublishFunctionWithMessageConf.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import org.apache.pulsar.client.api.TypedMessageBuilder;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Example function that uses the built in publish function in the context
+ * to publish to a desired topic based on config and setting various message configurations to be passed along.
+ *
+ */
+public class PublishFunctionWithMessageConf implements Function<String, Void> {
+    @Override
+    public Void process(String input, Context context) {
+        String publishTopic = (String) context.getUserConfigValueOrDefault("publish-topic", "publishtopic");
+        String output = String.format("%s!", input);
+
+        Map<String, String> properties = new HashMap<>();
+        properties.put("input_topic", context.getCurrentRecord().getTopicName().get());
+        properties.putAll(context.getCurrentRecord().getProperties());
+
+        Map<String, Object> messageConf = new HashMap<>();
+        messageConf.put(TypedMessageBuilder.CONF_PROPERTIES, properties);
+        if (context.getCurrentRecord().getKey().isPresent()) {
+            messageConf.put(TypedMessageBuilder.CONF_KEY, context.getCurrentRecord().getKey().get());
+        }
+        messageConf.put(TypedMessageBuilder.CONF_EVENT_TIME, System.currentTimeMillis());
+        context.publish(publishTopic, output, null, messageConf);
+        return null;
+    }
+}
diff --git a/pulsar-functions/python-examples/publish_function_with_message_conf.py b/pulsar-functions/python-examples/publish_function_with_message_conf.py
new file mode 100644
index 0000000..79aac02
--- /dev/null
+++ b/pulsar-functions/python-examples/publish_function_with_message_conf.py
@@ -0,0 +1,38 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import time
+from pulsar import Function
+
+# Example function that uses the built in publish function in the context
+# to publish to a desired topic based on config
+class PublishFunctionWithMessageConf(Function):
+  def __init__(self):
+    pass
+
+  def process(self, input, context):
+    publish_topic = "publishtopic"
+    if "publish-topic" in context.get_user_config_map():
+      publish_topic = context.get_user_config_value("publish-topic")
+    context.publish(publish_topic, input + '!',
+                    message_conf={"properties": {k: v for d in [{"input_topic" : context.get_current_message_topic_name()}, context.get_message_properties()] for k, v in d.items()},
+                                  "partition_key": context.get_partition_key(),
+                                  "event_timestamp": int(time.time())})
+    return
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index a60eb47..bb4c6b4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -26,6 +26,7 @@ import static org.testng.Assert.fail;
 import com.google.common.base.Stopwatch;
 import com.google.gson.Gson;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -725,6 +726,108 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     //
 
     @Test
+    public void testPythonPublishFunction() throws Exception {
+        testPublishFunction(Runtime.PYTHON);
+    }
+
+    @Test
+    public void testJavaPublishFunction() throws Exception {
+        testPublishFunction(Runtime.JAVA);
+    }
+
+    private void testPublishFunction(Runtime runtime) throws Exception {
+        if (functionRuntimeType == FunctionRuntimeType.THREAD) {
+            return;
+        }
+
+        Schema<?> schema;
+        if (Runtime.JAVA == runtime) {
+            schema = Schema.STRING;
+        } else {
+            schema = Schema.BYTES;
+        }
+
+        String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8);
+        String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8);
+
+        String functionName = "test-publish-fn-" + randomName(8);
+        final int numMessages = 10;
+
+        // submit the exclamation function
+
+        if (runtime == Runtime.PYTHON) {
+            submitFunction(
+                    runtime, inputTopicName, outputTopicName, functionName, PUBLISH_FUNCTION_PYTHON_FILE, PUBLISH_PYTHON_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+        } else {
+            submitFunction(
+                    runtime, inputTopicName, outputTopicName, functionName, null, PUBLISH_JAVA_CLASS, schema, Collections.singletonMap("publish-topic", outputTopicName));
+        }
+
+        // get function info
+        getFunctionInfoSuccess(functionName);
+
+        // get function stats
+        getFunctionStatsEmpty(functionName);
+
+        // publish and consume result
+        if (Runtime.JAVA == runtime) {
+            // java supports schema
+            publishAndConsumeMessages(inputTopicName, outputTopicName, numMessages);
+        } else {
+            // python doesn't support schema
+
+            @Cleanup PulsarClient client = PulsarClient.builder()
+                    .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
+                    .build();
+            @Cleanup Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
+                    .topic(outputTopicName)
+                    .subscriptionType(SubscriptionType.Exclusive)
+                    .subscriptionName("test-sub")
+                    .subscribe();
+
+            @Cleanup Producer<byte[]> producer = client.newProducer(Schema.BYTES)
+                    .topic(inputTopicName)
+                    .create();
+
+            for (int i = 0; i < numMessages; i++) {
+                producer.newMessage().key(String.valueOf(i)).property("count", String.valueOf(i)).value(("message-" + i).getBytes(UTF_8)).send();
+            }
+
+            Set<String> expectedMessages = new HashSet<>();
+            for (int i = 0; i < numMessages; i++) {
+                expectedMessages.add("message-" + i + "!");
+            }
+
+            for (int i = 0; i < numMessages; i++) {
+                Message<byte[]> msg = consumer.receive(30, TimeUnit.SECONDS);
+                String msgValue = new String(msg.getValue(), UTF_8);
+                log.info("Received: {}", msgValue);
+                assertEquals(msg.getKey(), String.valueOf(i));
+                assertEquals(msg.getProperties().get("count"), String.valueOf(i));
+                assertEquals(msg.getProperties().get("input_topic"), inputTopicName);
+                assertTrue(msg.getEventTime() > 0);
+                assertTrue(expectedMessages.contains(msgValue));
+                expectedMessages.remove(msgValue);
+            }
+        }
+
+        // get function status
+        getFunctionStatus(functionName, numMessages, true);
+
+        // get function stats
+        getFunctionStats(functionName, numMessages);
+
+        // delete function
+        deleteFunction(functionName);
+
+        // get function info
+        getFunctionInfoNotFound(functionName);
+
+        // make sure subscriptions are cleanup
+        checkSubscriptionsCleanup(inputTopicName);
+    }
+
+    @Test
     public void testPythonExclamationFunction() throws Exception {
         testExclamationFunction(Runtime.PYTHON, false, false, false);
     }
@@ -841,6 +944,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             functionName,
             pyZip,
             withExtraDeps,
+            false,
             getExclamationClass(runtime, pyZip, withExtraDeps),
             schema);
     }
@@ -851,8 +955,47 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                            String functionName,
                                            boolean pyZip,
                                            boolean withExtraDeps,
+                                           boolean isPublishFunction,
+                                           String functionClass,
+                                           Schema<T> inputTopicSchema) throws Exception {
+
+        String file = null;
+        if (Runtime.JAVA == runtime) {
+            file = null;
+        } else if (Runtime.PYTHON == runtime) {
+            if (isPublishFunction) {
+                file = PUBLISH_FUNCTION_PYTHON_FILE;
+            } else if (pyZip) {
+                file = EXCLAMATION_PYTHONZIP_FILE;
+            } else if (withExtraDeps) {
+                file = EXCLAMATION_WITH_DEPS_PYTHON_FILE;
+            } else {
+                file = EXCLAMATION_PYTHON_FILE;
+            }
+        }
+
+        submitFunction(runtime, inputTopicName, outputTopicName, functionName, file, functionClass, inputTopicSchema);
+    }
+
+    private static <T> void submitFunction(Runtime runtime,
+                                           String inputTopicName,
+                                           String outputTopicName,
+                                           String functionName,
+                                           String functionFile,
                                            String functionClass,
                                            Schema<T> inputTopicSchema) throws Exception {
+        submitFunction(runtime, inputTopicName, outputTopicName, functionName, functionFile, functionClass, inputTopicSchema, null);
+    }
+
+    private static <T> void submitFunction(Runtime runtime,
+                                           String inputTopicName,
+                                           String outputTopicName,
+                                           String functionName,
+                                           String functionFile,
+                                           String functionClass,
+                                           Schema<T> inputTopicSchema,
+                                           Map<String, String> userConfigs) throws Exception {
+
         CommandGenerator generator;
         log.info("------- INPUT TOPIC: '{}'", inputTopicName);
         if (inputTopicName.endsWith(".*")) {
@@ -864,28 +1007,25 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
         generator.setSinkTopic(outputTopicName);
         generator.setFunctionName(functionName);
+        if (userConfigs != null) {
+            generator.setUserConfig(userConfigs);
+        }
         String command;
         if (Runtime.JAVA == runtime) {
             command = generator.generateCreateFunctionCommand();
         } else if (Runtime.PYTHON == runtime) {
             generator.setRuntime(runtime);
-            if (pyZip) {
-                command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHONZIP_FILE);
-            } else if (withExtraDeps) {
-                command = generator.generateCreateFunctionCommand(EXCLAMATION_WITH_DEPS_PYTHON_FILE);
-            } else {
-                command = generator.generateCreateFunctionCommand(EXCLAMATION_PYTHON_FILE);
-            }
+            command = generator.generateCreateFunctionCommand(functionFile);
         } else {
             throw new IllegalArgumentException("Unsupported runtime : " + runtime);
         }
 
         log.info("---------- Function command: {}", command);
         String[] commands = {
-            "sh", "-c", command
+                "sh", "-c", command
         };
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
-            commands);
+                commands);
         assertTrue(result.getStdout().contains("\"Created successfully\""));
 
         ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
@@ -1181,7 +1321,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         // submit the exclamation function
         submitFunction(
-            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false,
+            Runtime.JAVA, inputTopicName, outputTopicName, functionName, false, false, false,
             AutoSchemaFunction.class.getName(),
             Schema.AVRO(CustomObject.class));
 
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
index 851793c..e717389 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java
@@ -73,6 +73,10 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String EXCLAMATION_JAVA_CLASS =
         "org.apache.pulsar.functions.api.examples.ExclamationFunction";
 
+    public static final String PUBLISH_JAVA_CLASS =
+            "org.apache.pulsar.functions.api.examples.PublishFunctionWithMessageConf";
+
+
     public static final String EXCLAMATION_PYTHON_CLASS =
         "exclamation_function.ExclamationFunction";
 
@@ -82,9 +86,13 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite {
     public static final String EXCLAMATION_PYTHONZIP_CLASS =
             "exclamation";
 
+    public static final String PUBLISH_PYTHON_CLASS = "publish_function_with_message_conf.PublishFunctionWithMessageConf";
+
     public static final String EXCLAMATION_PYTHON_FILE = "exclamation_function.py";
     public static final String EXCLAMATION_WITH_DEPS_PYTHON_FILE = "exclamation_with_extra_deps.py";
     public static final String EXCLAMATION_PYTHONZIP_FILE = "exclamation.zip";
+    public static final String PUBLISH_FUNCTION_PYTHON_FILE = "publish_function_with_message_conf.py";
+
 
     protected static String getExclamationClass(Runtime runtime,
                                                 boolean pyZip,