You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/05 06:22:29 UTC

[pulsar] branch master updated: Function localrun improved (#4453)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f56d30d  Function localrun improved (#4453)
f56d30d is described below

commit f56d30ddf40ef9eec172db1c0f78cc2143151bdb
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Jun 4 23:22:23 2019 -0700

    Function localrun improved (#4453)
    
    * fix localrun and add tests
    
    * minor fix
    
    * Improving localrun for functions
    
    * improving impl and adding tests
    
    * add missing modules
---
 bin/function-localrunner                           |   2 +-
 distribution/server/pom.xml                        |  13 +
 pulsar-broker/pom.xml                              |   7 +
 .../worker/PulsarFunctionLocalRunTest.java         | 719 +++++++++++++++++++++
 pulsar-functions/localrun-shaded/pom.xml           | 374 +++++++++++
 pulsar-functions/localrun/pom.xml                  |  73 +++
 .../org/apache/pulsar/functions/LocalRunner.java   | 466 +++++++++++++
 pulsar-functions/pom.xml                           |   2 +
 .../pulsar/functions/runtime/LocalRunner.java      | 283 --------
 9 files changed, 1655 insertions(+), 284 deletions(-)

diff --git a/bin/function-localrunner b/bin/function-localrunner
index 16362b4..a7aad76 100755
--- a/bin/function-localrunner
+++ b/bin/function-localrunner
@@ -59,7 +59,7 @@ fi
 OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
 OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
 
-MAINCLASS="org.apache.pulsar.functions.runtime.LocalRunner"
+MAINCLASS="org.apache.pulsar.functions.LocalRunner"
 
 #Change to PULSAR_HOME to support relative paths
 cd "$PULSAR_HOME"
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 42b7300..a40538c 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -164,6 +164,19 @@
       </exclusions>
     </dependency>
 
+    <!-- local-runner -->
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-local-runner-original</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!-- grpc -->
     <dependency>
       <groupId>io.grpc</groupId>
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 331f2d4..85b88ac 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -138,6 +138,13 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-local-runner-original</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
     <!-- functions related dependencies (end) -->
 
     <dependency>
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
new file mode 100644
index 0000000..bdcd27d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -0,0 +1,719 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.sun.net.httpserver.Headers;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+import org.apache.pulsar.common.functions.ConsumerConfig;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink;
+import org.apache.pulsar.io.datagenerator.DataGeneratorSource;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.net.InetSocketAddress;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+/**
+ * Test Pulsar sink on function
+ *
+ */
+public class PulsarFunctionLocalRunTest {
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    URL urlTls;
+    PulsarService pulsar;
+    PulsarAdmin admin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    WorkerService functionsWorkerService;
+    final String tenant = "external-repl-prop";
+    String pulsarFunctionsNamespace = tenant + "/" + CLUSTER + "/pulsar-function-admin";
+    String primaryHost;
+    String workerId;
+
+    private static final String CLUSTER = "local";
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerWebServiceTlsPort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+    private final int brokerServiceTlsPort = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+
+    private final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/authentication/tls/broker-cert.pem";
+    private final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/authentication/tls/broker-key.pem";
+    private final String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/authentication/tls/client-cert.pem";
+    private final String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/authentication/tls/client-key.pem";
+    private final String TLS_TRUST_CERT_FILE_PATH = "./src/test/resources/authentication/tls/cacert.pem";
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionLocalRunTest.class);
+    private Thread fileServerThread;
+    private static final int fileServerPort = PortManager.nextFreePort();
+    private HttpServer fileServer;
+
+    @DataProvider(name = "validRoleName")
+    public Object[][] validRoleName() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        // delete all function temp files
+        File dir = new File(System.getProperty("java.io.tmpdir"));
+        File[] foundFiles = dir.listFiles(new FilenameFilter() {
+            public boolean accept(File dir, String name) {
+                return name.startsWith("function");
+            }
+        });
+
+        for (File file : foundFiles) {
+            file.delete();
+        }
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        String brokerServiceUrl = "https://127.0.0.1:" + brokerWebServiceTlsPort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName(CLUSTER);
+        Set<String> superUsers = Sets.newHashSet("superUser");
+        config.setSuperUserRoles(superUsers);
+        config.setWebServicePort(Optional.ofNullable(brokerWebServicePort));
+        config.setWebServicePortTls(Optional.ofNullable(brokerWebServiceTlsPort));
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(Optional.ofNullable(brokerServicePort));
+        config.setBrokerServicePortTls(Optional.ofNullable(brokerServiceTlsPort));
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+        config.setTlsAllowInsecureConnection(true);
+        config.setAdvertisedAddress("localhost");
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderTls.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+
+        config.setAuthorizationEnabled(true);
+        config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
+        config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
+        config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
+        config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        config.setBrokerClientAuthenticationParameters(
+                "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
+        config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+        config.setBrokerClientTlsEnabled(true);
+
+        functionsWorkerService = createPulsarFunctionWorker(config);
+        urlTls = new URL(brokerServiceUrl);
+        Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
+        pulsar = new PulsarService(config, functionWorkerService);
+        pulsar.start();
+
+        Map<String, String> authParams = new HashMap<>();
+        authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
+        authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
+        Authentication authTls = new AuthenticationTls();
+        authTls.configure(authParams);
+
+        admin = spy(
+                PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH)
+                        .allowTlsInsecureConnection(true).authentication(authTls).build());
+
+        brokerStatsClient = admin.brokerStats();
+        primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(urlTls.toString());
+        admin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+                && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+            clientBuilder.enableTls(workerConfig.isUseTls());
+            clientBuilder.allowTlsInsecureConnection(workerConfig.isTlsAllowInsecureConnection());
+            clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+                    workerConfig.getClientAuthenticationParameters());
+        }
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add("superUser");
+        propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList(CLUSTER)));
+        admin.tenants().updateTenant(tenant, propAdmin);
+
+        // setting up simple web sever to test submitting function via URL
+        fileServerThread = new Thread(() -> {
+            try {
+                fileServer = HttpServer.create(new InetSocketAddress(fileServerPort), 0);
+                fileServer.createContext("/pulsar-io-data-generator.nar", he -> {
+                    try {
+
+                        Headers headers = he.getResponseHeaders();
+                        headers.add("Content-Type", "application/octet-stream");
+
+                        File file = new File(getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile());
+                        byte[] bytes  = new byte [(int)file.length()];
+
+                        FileInputStream fileInputStream = new FileInputStream(file);
+                        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+                        bufferedInputStream.read(bytes, 0, bytes.length);
+
+                        he.sendResponseHeaders(200, file.length());
+                        OutputStream outputStream = he.getResponseBody();
+                        outputStream.write(bytes, 0, bytes.length);
+                        outputStream.close();
+
+                    } catch (Exception e) {
+                        log.error("Error when downloading: {}", e, e);
+                    }
+                });
+                fileServer.createContext("/pulsar-functions-api-examples.jar", he -> {
+                    try {
+
+                        Headers headers = he.getResponseHeaders();
+                        headers.add("Content-Type", "application/octet-stream");
+
+                        File file = new File(getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile());
+                        byte[] bytes  = new byte [(int)file.length()];
+
+                        FileInputStream fileInputStream = new FileInputStream(file);
+                        BufferedInputStream bufferedInputStream = new BufferedInputStream(fileInputStream);
+                        bufferedInputStream.read(bytes, 0, bytes.length);
+
+                        he.sendResponseHeaders(200, file.length());
+                        OutputStream outputStream = he.getResponseBody();
+                        outputStream.write(bytes, 0, bytes.length);
+                        outputStream.close();
+
+                    } catch (Exception e) {
+                        log.error("Error when downloading: {}", e, e);
+                    }
+                });
+                fileServer.setExecutor(null); // creates a default executor
+                log.info("Starting file server...");
+                fileServer.start();
+            } catch (Exception e) {
+                log.error("Failed to start file server: ", e);
+                fileServer.stop(0);
+            }
+
+        });
+        fileServerThread.start();
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        fileServer.stop(0);
+        fileServerThread.interrupt();
+        pulsarClient.close();
+        admin.close();
+        functionsWorkerService.stop();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
+        System.setProperty(JAVA_INSTANCE_JAR_PROPERTY,
+                FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath());
+
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName(CLUSTER));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
+        workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+
+        workerConfig.setClientAuthenticationPlugin(AuthenticationTls.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH));
+        workerConfig.setUseTls(true);
+        workerConfig.setTlsAllowInsecureConnection(true);
+        workerConfig.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
+
+        workerConfig.setAuthenticationEnabled(true);
+        workerConfig.setAuthorizationEnabled(true);
+
+        return new WorkerService(workerConfig);
+    }
+
+    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setInputs(Collections.singleton(sourceTopic));
+        functionConfig.setAutoAck(true);
+        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
+        return functionConfig;
+    }
+
+    private static SourceConfig createSourceConfig(String tenant, String namespace, String functionName, String sinkTopic) {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(tenant);
+        sourceConfig.setNamespace(namespace);
+        sourceConfig.setName(functionName);
+        sourceConfig.setParallelism(1);
+        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sourceConfig.setTopicName(sinkTopic);
+        return sourceConfig;
+    }
+
+    private static SinkConfig createSinkConfig(String tenant, String namespace, String functionName, String sourceTopic, String subName) {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(tenant);
+        sinkConfig.setNamespace(namespace);
+        sinkConfig.setName(functionName);
+        sinkConfig.setParallelism(1);
+        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().build()));
+        sinkConfig.setSourceSubscriptionName(subName);
+        sinkConfig.setCleanupSubscription(true);
+        return sinkConfig;
+    }
+    /**
+     * Validates pulsar sink e2e functionality on functions.
+     *
+     * @throws Exception
+     */
+    private void testE2EPulsarFunctionLocalRun(String jarFilePathUrl) throws Exception {
+
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList(CLUSTER));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe();
+
+        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
+                sourceTopic, sinkTopic, subscriptionName);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
+        functionConfig.setJar(jarFilePathUrl);
+        LocalRunner localRunner = LocalRunner.builder()
+                .functionConfig(functionConfig)
+                .clientAuthPlugin(AuthenticationTls.class.getName())
+                .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+                .useTls(true)
+                .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsAllowInsecureConnection(true)
+                .tlsHostNameVerificationEnabled(false)
+                .brokerServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()).build();
+        localRunner.start(false);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats stats = admin.topics().getStats(sourceTopic);
+                return stats.subscriptions.get(subscriptionName) != null
+                        && !stats.subscriptions.get(subscriptionName).consumers.isEmpty();
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+        // validate pulsar sink consumer has started on the topic
+        TopicStats stats = admin.topics().getStats(sourceTopic);
+        assertTrue(stats.subscriptions.get(subscriptionName) != null
+                && !stats.subscriptions.get(subscriptionName).consumers.isEmpty());
+
+        int totalMsgs = 5;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 150);
+
+        for (int i = 0; i < totalMsgs; i++) {
+            Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedPropertyValue = msg.getProperty(propertyKey);
+            assertEquals(propertyValue, receivedPropertyValue);
+            assertEquals(msg.getValue(),  "my-message-" + i + "!");
+        }
+
+        // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked messages
+        // due to publish failure
+        assertNotEquals(admin.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+                totalMsgs);
+
+        // stop functions
+        localRunner.stop();
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats topicStats = admin.topics().getStats(sourceTopic);
+                return topicStats.subscriptions.get(subscriptionName) != null
+                        && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty();
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 20, 150);
+
+        TopicStats topicStats = admin.topics().getStats(sourceTopic);
+        assertTrue(topicStats.subscriptions.get(subscriptionName) != null
+                && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty());
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 0);
+            } catch (PulsarAdminException e) {
+                if (e.getStatusCode() == 404) {
+                    return true;
+                }
+                return false;
+            }
+        }, 10, 150);
+
+        try {
+            assertTrue(admin.topics().getStats(sinkTopic).publishers.size() == 0);
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() != 404) {
+                fail();
+            }
+        }
+    }
+
+    @Test(timeOut = 20000)
+    public void testE2EPulsarFunctionLocalRun() throws Exception {
+        testE2EPulsarFunctionLocalRun(null);
+    }
+
+    @Test(timeOut = 20000)
+    public void testE2EPulsarFunctionLocalRunWithJar() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+        testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+    }
+
+    @Test(timeOut = 40000)
+    public void testE2EPulsarFunctionLocalRunURL() throws Exception {
+        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-functions-api-examples.jar", fileServerPort);
+        testE2EPulsarFunctionLocalRun(jarFilePathUrl);
+    }
+
+    private void testPulsarSourceLocalRun(String jarFilePathUrl) throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String sourceName = "PulsarSource-test";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList(CLUSTER));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        SourceConfig sourceConfig = createSourceConfig(tenant, namespacePortion, sourceName, sinkTopic);
+        if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
+            sourceConfig.setClassName(DataGeneratorSource.class.getName());
+        }
+
+        sourceConfig.setArchive(jarFilePathUrl);
+        LocalRunner localRunner = LocalRunner.builder()
+                .sourceConfig(sourceConfig)
+                .clientAuthPlugin(AuthenticationTls.class.getName())
+                .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+                .useTls(true)
+                .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsAllowInsecureConnection(true)
+                .tlsHostNameVerificationEnabled(false)
+                .brokerServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()).build();
+
+        localRunner.start(false);
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 1);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 150);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats sourceStats = admin.topics().getStats(sinkTopic);
+                return sourceStats.publishers.size() == 1
+                        && sourceStats.publishers.get(0).metadata != null
+                        && sourceStats.publishers.get(0).metadata.containsKey("id")
+                        && sourceStats.publishers.get(0).metadata.get("id").equals(String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+
+        TopicStats sourceStats = admin.topics().getStats(sinkTopic);
+        assertEquals(sourceStats.publishers.size(), 1);
+        assertTrue(sourceStats.publishers.get(0).metadata != null);
+        assertTrue(sourceStats.publishers.get(0).metadata.containsKey("id"));
+        assertEquals(sourceStats.publishers.get(0).metadata.get("id"), String.format("%s/%s/%s", tenant, namespacePortion, sourceName));
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 1)
+                        && (admin.topics().getInternalStats(sinkTopic).numberOfEntries > 4);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 50, 150);
+        assertEquals(admin.topics().getStats(sinkTopic).publishers.size(), 1);
+
+        localRunner.stop();
+
+        retryStrategically((test) -> {
+            try {
+                return (admin.topics().getStats(sinkTopic).publishers.size() == 0);
+            } catch (PulsarAdminException e) {
+                if (e.getStatusCode() == 404) {
+                    return true;
+                }
+                return false;
+            }
+        }, 10, 150);
+
+        try {
+            assertTrue(admin.topics().getStats(sinkTopic).publishers.size() == 0);
+        } catch (PulsarAdminException e) {
+            if (e.getStatusCode() != 404) {
+                fail();
+            }
+        }
+    }
+
+
+    @Test(timeOut = 20000)
+    public void testPulsarSourceLocalRunNoArchive() throws Exception {
+        testPulsarSourceLocalRun(null);
+    }
+
+    // TODO bug to fix involving submitting a NAR via URI file:///tmp/pulsar-io-twitter-0.0.1.nar
+//    @Test(timeOut = 20000)
+//    public void testPulsarSourceLocalRunWithFile() throws Exception {
+//        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+//        testPulsarSourceStats(jarFilePathUrl);
+//    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSourceLocalRunWithUrl() throws Exception {
+        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+        testPulsarSourceLocalRun(jarFilePathUrl);
+    }
+
+
+    private void testPulsarSinkStats(String jarFilePathUrl) throws Exception {
+        final String namespacePortion = "io";
+        final String replNamespace = tenant + "/" + namespacePortion;
+        final String sourceTopic = "persistent://" + replNamespace + "/input";
+        final String sinkName = "PulsarSink-test";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String subscriptionName = "test-sub";
+        admin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("local"));
+        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        // create a producer that creates a topic at broker
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+
+        SinkConfig sinkConfig = createSinkConfig(tenant, namespacePortion, sinkName, sourceTopic, subscriptionName);
+
+        sinkConfig.setInputSpecs(Collections.singletonMap(sourceTopic, ConsumerConfig.builder().receiverQueueSize(1000).build()));
+        if (jarFilePathUrl == null || !jarFilePathUrl.endsWith(".nar")) {
+            sinkConfig.setClassName(DataGeneratorPrintSink.class.getName());
+        }
+
+        sinkConfig.setArchive(jarFilePathUrl);
+        LocalRunner localRunner = LocalRunner.builder()
+                .sinkConfig(sinkConfig)
+                .clientAuthPlugin(AuthenticationTls.class.getName())
+                .clientAuthParams(String.format("tlsCertFile:%s,tlsKeyFile:%s", TLS_CLIENT_CERT_FILE_PATH, TLS_CLIENT_KEY_FILE_PATH))
+                .useTls(true)
+                .tlsTrustCertFilePath(TLS_TRUST_CERT_FILE_PATH)
+                .tlsAllowInsecureConnection(true)
+                .tlsHostNameVerificationEnabled(false)
+                .brokerServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get()).build();
+
+        localRunner.start(false);
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats topicStats = admin.topics().getStats(sourceTopic);
+
+                return topicStats.subscriptions.containsKey(subscriptionName)
+                        && topicStats.subscriptions.get(subscriptionName).consumers.size() == 1
+                        && topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits == 1000;
+
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 20, 150);
+
+        TopicStats topicStats = admin.topics().getStats(sourceTopic);
+        assertEquals(topicStats.subscriptions.size(), 1);
+        assertTrue(topicStats.subscriptions.containsKey(subscriptionName));
+        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.size(), 1);
+        assertEquals(topicStats.subscriptions.get(subscriptionName).consumers.get(0).availablePermits, 1000);
+
+        int totalMsgs = 10;
+        for (int i = 0; i < totalMsgs; i++) {
+            String data = "my-message-" + i;
+            producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+        }
+        retryStrategically((test) -> {
+            try {
+                SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 5, 200);
+
+        // stop sink
+        localRunner.stop();
+
+        retryStrategically((test) -> {
+            try {
+                TopicStats stats = admin.topics().getStats(sourceTopic);
+                return stats.subscriptions.get(subscriptionName) != null
+                        && stats.subscriptions.get(subscriptionName).consumers.isEmpty();
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 20, 150);
+
+        topicStats = admin.topics().getStats(sourceTopic);
+        assertTrue(topicStats.subscriptions.get(subscriptionName) != null
+                && topicStats.subscriptions.get(subscriptionName).consumers.isEmpty());
+
+    }
+
+    @Test(timeOut = 20000)
+    public void testPulsarSinkStatsNoArchive() throws Exception {
+        testPulsarSinkStats(null);
+    }
+
+//    @Test(timeOut = 20000)
+//    public void testPulsarSinkStatsWithFile() throws Exception {
+//        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+//        testPulsarSinkStats(jarFilePathUrl);
+//    }
+
+    @Test(timeOut = 40000)
+    public void testPulsarSinkStatsWithUrl() throws Exception {
+        String jarFilePathUrl = String.format("http://127.0.0.1:%d/pulsar-io-data-generator.nar", fileServerPort);
+        testPulsarSinkStats(jarFilePathUrl);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-functions/localrun-shaded/pom.xml b/pulsar-functions/localrun-shaded/pom.xml
new file mode 100644
index 0000000..3e0f2fe
--- /dev/null
+++ b/pulsar-functions/localrun-shaded/pom.xml
@@ -0,0 +1,374 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-functions</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>pulsar-functions-local-runner</artifactId>
+    <name>Pulsar Functions :: Local Runner Shaded</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-functions-local-runner-original</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <minimizeJar>false</minimizeJar>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
+                            </transformers>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>io.netty:netty-common</exclude>
+                                    <exclude>io.netty:netty-buffer</exclude>
+                                    <exclude>io.netty:netty-codec-http2</exclude>
+                                    <exclude>io.netty:netty-codec-http</exclude>
+                                    <exclude>io.netty:netty-codec-socks</exclude>
+                                    <exclude>io.netty:netty-codec</exclude>
+                                    <exclude>io.netty:netty-handler</exclude>
+                                    <exclude>io.netty:netty-handler-proxy</exclude>
+                                    <exclude>io.netty:netty-transport</exclude>
+                                    <exclude>io.netty:netty-resolver</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <!-- Shading signed JARs will fail without
+                                        this. http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar -->
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <relocations>
+                                <relocation>
+                                    <pattern>com.typesafe.netty</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.http</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.http</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.jute</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.jute</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.servlet</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.servlet</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.junit</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.junit</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>junit</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.junit</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>net.jodah</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jodah</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.lz4</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.lz4</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.reactivestreams</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.reactivestreams</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.commons</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.swagger</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.yaml</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.yaml</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.jctools</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.jctools</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.squareup.okhttp</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okhttp</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.grpc</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.grpc</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.joda</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.joda</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.ws.rs</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.ws.rs</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.kubernetes</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.kubernetes</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.opencensus</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.opencensus</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>net.jpountz</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jpountz</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.aspectj</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.aspectj</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>commons-configuration</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-configuration</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.tukaani</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.tukaani</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.github</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>commons-io</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-io</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.distributedlog</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.inferred</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.bookkeeper</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.bookkeeper</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bookkeeper</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>dlshade</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.codehaus.jackson</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>net.java.dev.jna</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.curator</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.curator</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.validation</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.validation</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.activation</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.activation</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.prometheus</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.zookeeper</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.jsonwebtoken</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.jsonwebtoken</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>commons-codec</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-codec</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.thoughtworks.paranamer</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.thoughtworks.paranamer</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.codehaus.mojo</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.mojo</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.github.luben</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github.luben</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>jline</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.jline</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>commons-logging</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-logging</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.bouncycastle</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bouncycastle</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.xerial.snappy</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.xerial.snappy</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>javax.annotation</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.annotation</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.checkerframework</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.checkerframework</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.yetus</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.yetus</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>commons-cli</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-cli</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>commons-lang</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-lang</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.squareup.okio</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okio</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.rocksdb</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.rocksdb</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.objenesis</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.objenesis</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.eclipse.jetty</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.eclipse.jetty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.avro</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.avro</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>avro.shaded</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.avo.shaded</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.yahoo</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.yahoo</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.beust</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>io.netty</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.hamcrest</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>aj.org</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.aj.org</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.scurrilous</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.scurrilous</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>okio</pattern>
+                                    <shadedPattern>org.apache.pulsar.functions.runtime.shaded.okio</shadedPattern>
+                                </relocation>
+                                <!--
+                                    asynchttpclient can only be shaded to be under `org.apache.pulsar.shade`
+                                    see {@link https://github.com/apache/incubator-pulsar/pull/390}
+                                    and {@link https://github.com/apache/incubator-pulsar/blob/master/pulsar-client/src/main/resources/ahc.properties}
+                                -->
+                                <relocation>
+                                    <pattern>org.asynchttpclient</pattern>
+                                    <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
+                                </relocation>
+                                <!-- DONT ever shade log4j, otherwise logging won't work anymore in running functions in process mode
+                                <relocation>
+                                  <pattern>org.apache.logging</pattern>
+                                  <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.logging</shadedPattern>
+                                </relocation>
+                                -->
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/pulsar-functions/localrun/pom.xml b/pulsar-functions/localrun/pom.xml
new file mode 100644
index 0000000..d53b0ab
--- /dev/null
+++ b/pulsar-functions/localrun/pom.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.pulsar</groupId>
+        <artifactId>pulsar-functions</artifactId>
+        <version>2.4.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>pulsar-functions-local-runner-original</artifactId>
+    <name>Pulsar Functions :: Local Runner original</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-functions-runtime</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>pulsar-client-original</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>io.grpc</groupId>
+            <artifactId>grpc-all</artifactId>
+        </dependency>
+    </dependencies>
+</project>
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
new file mode 100644
index 0000000..1540744
--- /dev/null
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -0,0 +1,466 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions;
+
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
+import lombok.Builder;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.utils.io.ConnectorUtils;
+import org.apache.pulsar.functions.utils.io.Connectors;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
+import static org.apache.pulsar.functions.utils.FunctionCommon.extractClassLoader;
+import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
+
+@Slf4j
+public class LocalRunner {
+
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    private final List<RuntimeSpawner> spawners = new LinkedList<>();
+
+    public enum RuntimeEnv {
+        THREAD,
+        PROCESS
+    }
+
+    public static class FunctionConfigConverter implements IStringConverter<FunctionConfig> {
+        @Override
+        public FunctionConfig convert(String value) {
+            try {
+                return ObjectMapperFactory.getThreadLocal().readValue(value, FunctionConfig.class);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to parse function config:", e);
+            }
+        }
+    }
+
+    public static class SourceConfigConverter implements IStringConverter<SourceConfig> {
+        @Override
+        public SourceConfig convert(String value) {
+            try {
+                return ObjectMapperFactory.getThreadLocal().readValue(value, SourceConfig.class);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to parse source config:", e);
+            }
+        }
+    }
+
+    public static class SinkConfigConverter implements IStringConverter<SinkConfig> {
+        @Override
+        public SinkConfig convert(String value) {
+            try {
+                return ObjectMapperFactory.getThreadLocal().readValue(value, SinkConfig.class);
+            } catch (IOException e) {
+                throw new RuntimeException("Failed to parse sink config:", e);
+            }
+        }
+    }
+
+    public static class RuntimeConverter implements IStringConverter<RuntimeEnv> {
+        @Override
+        public RuntimeEnv convert(String value) {
+            return RuntimeEnv.valueOf(value);
+        }
+    }
+
+    @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true, converter = FunctionConfigConverter.class)
+    protected FunctionConfig functionConfig;
+    @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true, converter = SourceConfigConverter.class)
+    protected SourceConfig sourceConfig;
+    @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true, converter = SinkConfigConverter.class)
+    protected SinkConfig sinkConfig;
+    @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
+    protected String stateStorageServiceUrl;
+    @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
+    protected String brokerServiceUrl;
+    @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
+    protected String clientAuthPlugin;
+    @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
+    protected String clientAuthParams;
+    @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
+    protected boolean useTls;
+    @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
+    protected boolean tlsAllowInsecureConnection;
+    @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
+    protected boolean tlsHostNameVerificationEnabled;
+    @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
+    protected String tlsTrustCertFilePath;
+    @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
+    protected int instanceIdOffset = 0;
+    @Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
+    protected RuntimeEnv runtimeEnv;
+
+    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
+
+    public static void main(String[] args) throws Exception {
+        LocalRunner localRunner = LocalRunner.builder().build();
+        JCommander jcommander = new JCommander(localRunner);
+        jcommander.setProgramName("LocalRunner");
+
+        // parse args by JCommander
+        jcommander.parse(args);
+        localRunner.start(true);
+    }
+
+    @Builder
+    public LocalRunner(FunctionConfig functionConfig, SourceConfig sourceConfig, SinkConfig sinkConfig, String
+            stateStorageServiceUrl, String brokerServiceUrl, String clientAuthPlugin, String clientAuthParams,
+                       boolean useTls, boolean tlsAllowInsecureConnection, boolean tlsHostNameVerificationEnabled,
+                       String tlsTrustCertFilePath, int instanceIdOffset, RuntimeEnv runtimeEnv) {
+        this.functionConfig = functionConfig;
+        this.sourceConfig = sourceConfig;
+        this.sinkConfig = sinkConfig;
+        this.stateStorageServiceUrl = stateStorageServiceUrl;
+        this.brokerServiceUrl = brokerServiceUrl;
+        this.clientAuthPlugin = clientAuthPlugin;
+        this.clientAuthParams = clientAuthParams;
+        this.useTls = useTls;
+        this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
+        this.tlsHostNameVerificationEnabled = tlsHostNameVerificationEnabled;
+        this.tlsTrustCertFilePath = tlsTrustCertFilePath;
+        this.instanceIdOffset = instanceIdOffset;
+        this.runtimeEnv = runtimeEnv;
+
+        java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                LocalRunner.this.stop();
+            }
+        });
+    }
+
+    public synchronized void stop() {
+        running.set(false);
+        log.info("Shutting down the localrun runtimeSpawner ...");
+        for (RuntimeSpawner spawner : spawners) {
+            spawner.close();
+        }
+        spawners.clear();
+    }
+
+    public void start(boolean blocking) throws Exception {
+        List<RuntimeSpawner> local = new LinkedList<>();
+        synchronized (this) {
+            if (running.get() == true) {
+                throw new IllegalArgumentException("Pulsar Function local run already started!");
+            }
+
+            Function.FunctionDetails functionDetails;
+            String userCodeFile;
+            int parallelism;
+            if (functionConfig != null) {
+                FunctionConfigUtils.inferMissingArguments(functionConfig);
+                ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+                parallelism = functionConfig.getParallelism();
+                if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
+                    userCodeFile = functionConfig.getJar();
+
+                    // if code file not specified try to get location of the code based on class.
+                    if (userCodeFile == null && functionConfig.getClassName() != null) {
+                        userCodeFile = Thread.currentThread().getContextClassLoader()
+                                .loadClass(functionConfig.getClassName())
+                                .getProtectionDomain().getCodeSource().getLocation().getFile();
+                    }
+
+                    if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+                        File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+                        classLoader = FunctionConfigUtils.validate(functionConfig, file);
+                    } else {
+                        File file = new File(userCodeFile);
+                        if (!file.exists()) {
+                            throw new RuntimeException("User jar does not exist");
+                        }
+                        classLoader = FunctionConfigUtils.validate(functionConfig, file);
+                    }
+
+                } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
+                    userCodeFile = functionConfig.getGo();
+                } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON) {
+                    userCodeFile = functionConfig.getPy();
+                } else {
+                    throw new UnsupportedOperationException();
+                }
+
+                functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader);
+            } else if (sourceConfig != null) {
+                inferMissingArguments(sourceConfig);
+                userCodeFile = sourceConfig.getArchive();
+
+                // if code file not specified try to get location of the code based on class.
+                if (userCodeFile == null && sourceConfig.getClassName() != null) {
+                    userCodeFile = Thread.currentThread().getContextClassLoader()
+                            .loadClass(sourceConfig.getClassName())
+                            .getProtectionDomain().getCodeSource().getLocation().getFile();
+                }
+
+                if (userCodeFile == null) {
+                    userCodeFile = Thread.currentThread().getContextClassLoader()
+                            .loadClass(LocalRunner.class.getName())
+                            .getProtectionDomain().getCodeSource().getLocation().getFile();
+                }
+                log.info("userCodeFile: {}", userCodeFile);
+
+                String builtInSource = isBuiltInSource(userCodeFile);
+                if (builtInSource != null) {
+                    sourceConfig.setArchive(builtInSource);
+                }
+                parallelism = sourceConfig.getParallelism();
+
+                if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+                    File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+
+                } else {
+                    File file = new File(userCodeFile);
+                    if (!file.exists()) {
+                        throw new RuntimeException("Source archive does not exist");
+                    }
+                    functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
+                }
+            } else if (sinkConfig != null) {
+                inferMissingArguments(sinkConfig);
+                userCodeFile = sinkConfig.getArchive();
+
+                // if code file not specified try to get location of the code based on class.
+                if (userCodeFile == null && sinkConfig.getClassName() != null) {
+                    userCodeFile = Thread.currentThread().getContextClassLoader()
+                            .loadClass(sinkConfig.getClassName())
+                            .getProtectionDomain().getCodeSource().getLocation().getFile();
+                }
+
+                String builtInSink = isBuiltInSource(userCodeFile);
+                if (builtInSink != null) {
+                    sinkConfig.setArchive(builtInSink);
+                }
+                parallelism = sinkConfig.getParallelism();
+
+                if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
+                    File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
+                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+                } else {
+                    File file = new File(userCodeFile);
+                    if (!file.exists()) {
+                        throw new RuntimeException("Sink archive does not exist");
+                    }
+                    functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
+                }
+            } else {
+                throw new IllegalArgumentException("Must specify Function, Source or Sink config");
+            }
+
+            if (System.getProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY) == null) {
+                System.setProperty(FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY,
+                        LocalRunner.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+            }
+
+            AuthenticationConfig authConfig = AuthenticationConfig.builder().clientAuthenticationPlugin
+                    (clientAuthPlugin)
+                    .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
+                    .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
+                    .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
+                    .tlsTrustCertsFilePath(tlsTrustCertFilePath).build();
+
+            String serviceUrl = DEFAULT_SERVICE_URL;
+            if (brokerServiceUrl != null) {
+                serviceUrl = brokerServiceUrl;
+            }
+
+            if ((sourceConfig != null || sinkConfig != null || functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA)
+                    && (runtimeEnv == null || runtimeEnv == RuntimeEnv.THREAD)) {
+                // By default run java functions as threads
+                startThreadedMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
+                        stateStorageServiceUrl, authConfig, userCodeFile);
+            } else {
+                startProcessMode(functionDetails, parallelism, instanceIdOffset, serviceUrl,
+                        stateStorageServiceUrl, authConfig, userCodeFile);
+            }
+            local.addAll(spawners);
+        }
+
+        if (blocking) {
+            for (RuntimeSpawner spawner : local) {
+                spawner.join();
+                log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
+            }
+        }
+    }
+
+    private void startProcessMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
+                                           int parallelism, int instanceIdOffset, String serviceUrl,
+                                           String stateStorageServiceUrl, AuthenticationConfig authConfig,
+                                           String userCodeFile) throws Exception {
+
+        try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
+                serviceUrl,
+                stateStorageServiceUrl,
+                authConfig,
+                null, /* java instance jar file */
+                null, /* python instance file */
+                null, /* log directory */
+                null, /* extra dependencies dir */
+                new DefaultSecretsProviderConfigurator(), false)) {
+
+            for (int i = 0; i < parallelism; ++i) {
+                InstanceConfig instanceConfig = new InstanceConfig();
+                instanceConfig.setFunctionDetails(functionDetails);
+                // TODO: correctly implement function version and id
+                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+                instanceConfig.setFunctionId(UUID.randomUUID().toString());
+                instanceConfig.setInstanceId(i + instanceIdOffset);
+                instanceConfig.setMaxBufferedTuples(1024);
+                instanceConfig.setPort(FunctionCommon.findAvailablePort());
+                instanceConfig.setClusterName("local");
+                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+                        instanceConfig,
+                        userCodeFile,
+                        null,
+                        containerFactory,
+                        30000);
+                spawners.add(runtimeSpawner);
+                runtimeSpawner.start();
+            }
+            Timer statusCheckTimer = new Timer();
+            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
+                @Override
+                public void run() {
+                    CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
+                    int index = 0;
+                    for (RuntimeSpawner spawner : spawners) {
+                        futures[index] = spawner.getFunctionStatusAsJson(index);
+                        index++;
+                    }
+                    try {
+                        CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
+                        for (index = 0; index < futures.length; ++index) {
+                            String json = futures[index].get();
+                            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+                            log.info(gson.toJson(new JsonParser().parse(json)));
+                        }
+                    } catch (Exception ex) {
+                        log.error("Could not get status from all local instances");
+                    }
+                }
+            }, 30000, 30000);
+            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    statusCheckTimer.cancel();
+                }
+            });
+        }
+    }
+
+
+    private void startThreadedMode(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
+                                           int parallelism, int instanceIdOffset, String serviceUrl,
+                                           String stateStorageServiceUrl, AuthenticationConfig authConfig,
+                                           String userCodeFile) throws Exception {
+        ThreadRuntimeFactory threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
+                serviceUrl,
+                stateStorageServiceUrl,
+                authConfig,
+                new ClearTextSecretsProvider(), null);
+        for (int i = 0; i < parallelism; ++i) {
+            InstanceConfig instanceConfig = new InstanceConfig();
+            instanceConfig.setFunctionDetails(functionDetails);
+            // TODO: correctly implement function version and id
+            instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
+            instanceConfig.setFunctionId(UUID.randomUUID().toString());
+            instanceConfig.setInstanceId(i + instanceIdOffset);
+            instanceConfig.setMaxBufferedTuples(1024);
+            instanceConfig.setPort(FunctionCommon.findAvailablePort());
+            instanceConfig.setClusterName("local");
+            RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
+                    instanceConfig,
+                    userCodeFile,
+                    null,
+                    threadRuntimeFactory,
+                    30000);
+            spawners.add(runtimeSpawner);
+            runtimeSpawner.start();
+        }
+    }
+
+    private String isBuiltInSource(String sourceType) throws IOException {
+        // Validate the connector source type from the locally available connectors
+        Connectors connectors = getConnectors();
+
+        if (connectors.getSources().containsKey(sourceType)) {
+            // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
+            return connectors.getSources().get(sourceType).toString();
+        } else {
+            return null;
+        }
+    }
+
+    private String isBuiltInSink(String sinkType) throws IOException {
+        // Validate the connector source type from the locally available connectors
+        Connectors connectors = getConnectors();
+
+        if (connectors.getSinks().containsKey(sinkType)) {
+            // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
+            return connectors.getSinks().get(sinkType).toString();
+        } else {
+            return null;
+        }
+    }
+
+    private Connectors getConnectors() throws IOException {
+        // Validate the connector source type from the locally available connectors
+        String pulsarHome = System.getenv("PULSAR_HOME");
+        if (pulsarHome == null) {
+            pulsarHome = Paths.get("").toAbsolutePath().toString();
+        }
+        String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
+        return ConnectorUtils.searchForConnectors(connectorsDir);
+    }
+}
diff --git a/pulsar-functions/pom.xml b/pulsar-functions/pom.xml
index 0f0219a..172acf0 100644
--- a/pulsar-functions/pom.xml
+++ b/pulsar-functions/pom.xml
@@ -41,6 +41,8 @@
     <module>runtime-all</module>
     <module>worker</module>
     <module>secrets</module>
+    <module>localrun</module>
+    <module>localrun-shaded</module>
   </modules>
 
 </project>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
deleted file mode 100644
index 1c180ab..0000000
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ /dev/null
@@ -1,283 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.runtime;
-
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import com.google.gson.JsonParser;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-
-import lombok.extern.slf4j.Slf4j;
-
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.instance.AuthenticationConfig;
-import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.*;
-import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.utils.io.Connectors;
-
-import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
-import static org.apache.pulsar.functions.utils.FunctionCommon.*;
-
-@Slf4j
-public class LocalRunner {
-
-    @Parameter(names = "--functionConfig", description = "The json representation of FunctionConfig", hidden = true)
-    protected String functionConfigString;
-    @Parameter(names = "--sourceConfig", description = "The json representation of SourceConfig", hidden = true)
-    protected String sourceConfigString;
-    @Parameter(names = "--sinkConfig", description = "The json representation of SinkConfig", hidden = true)
-    protected String sinkConfigString;
-    @Parameter(names = "--stateStorageServiceUrl", description = "The URL for the state storage service (by default Apache BookKeeper)", hidden = true)
-    protected String stateStorageServiceUrl;
-    @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker", hidden = true)
-    protected String brokerServiceUrl;
-    @Parameter(names = "--clientAuthPlugin", description = "Client authentication plugin using which function-process can connect to broker", hidden = true)
-    protected String clientAuthPlugin;
-    @Parameter(names = "--clientAuthParams", description = "Client authentication param", hidden = true)
-    protected String clientAuthParams;
-    @Parameter(names = "--useTls", description = "Use tls connection\n", hidden = true, arity = 1)
-    protected boolean useTls;
-    @Parameter(names = "--tlsAllowInsecureConnection", description = "Allow insecure tls connection\n", hidden = true, arity = 1)
-    protected boolean tlsAllowInsecureConnection;
-    @Parameter(names = "--tlsHostNameVerificationEnabled", description = "Enable hostname verification", hidden = true, arity = 1)
-    protected boolean tlsHostNameVerificationEnabled;
-    @Parameter(names = "--tlsTrustCertFilePath", description = "tls trust cert file path", hidden = true)
-    protected String tlsTrustCertFilePath;
-    @Parameter(names = "--instanceIdOffset", description = "Start the instanceIds from this offset", hidden = true)
-    protected Integer instanceIdOffset = 0;
-    private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
-
-    public static void main(String[] args) throws Exception {
-        LocalRunner localRunner = new LocalRunner();
-        JCommander jcommander = new JCommander(localRunner);
-        jcommander.setProgramName("LocalRunner");
-
-        // parse args by JCommander
-        jcommander.parse(args);
-        localRunner.start();
-    }
-
-    void start() throws Exception {
-        Function.FunctionDetails functionDetails;
-        String userCodeFile;
-        int parallelism;
-        if (!StringUtils.isEmpty(functionConfigString)) {
-            FunctionConfig functionConfig = new Gson().fromJson(functionConfigString, FunctionConfig.class);
-            FunctionConfigUtils.inferMissingArguments(functionConfig);
-            ClassLoader classLoader = null;
-            parallelism = functionConfig.getParallelism();
-            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
-                userCodeFile = functionConfig.getJar();
-                if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                    classLoader = extractClassLoader(userCodeFile);
-                } else {
-                    File file = new File(userCodeFile);
-                    if (!file.exists()) {
-                        throw new RuntimeException("User jar does not exist");
-                    }
-                    classLoader = loadJar(file);
-                }
-            } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.GO) {
-                userCodeFile = functionConfig.getGo();
-            } else if (functionConfig.getRuntime() == FunctionConfig.Runtime.PYTHON){
-                userCodeFile = functionConfig.getPy();
-            } else {
-                throw new UnsupportedOperationException();
-            }
-            functionDetails = FunctionConfigUtils.convert(functionConfig, classLoader);
-        } else if (!StringUtils.isEmpty(sourceConfigString)) {
-            SourceConfig sourceConfig = new Gson().fromJson(sourceConfigString, SourceConfig.class);
-            inferMissingArguments(sourceConfig);
-            String builtInSource = isBuiltInSource(sourceConfig.getArchive());
-            if (builtInSource != null) {
-                sourceConfig.setArchive(builtInSource);
-            }
-            parallelism = sourceConfig.getParallelism();
-            userCodeFile = sourceConfig.getArchive();
-            if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
-            } else {
-                File file = new File(userCodeFile);
-                if (!file.exists()) {
-                    throw new RuntimeException("Source archive does not exist");
-                }
-                functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, file));
-            }
-        } else {
-            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class);
-            inferMissingArguments(sinkConfig);
-            String builtInSink = isBuiltInSource(sinkConfig.getArchive());
-            if (builtInSink != null) {
-                sinkConfig.setArchive(builtInSink);
-            }
-            parallelism = sinkConfig.getParallelism();
-            userCodeFile = sinkConfig.getArchive();
-            if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                File file = FunctionCommon.extractFileFromPkgURL(userCodeFile);
-                functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
-            } else {
-                File file = new File(userCodeFile);
-                if (!file.exists()) {
-                    throw new RuntimeException("Sink archive does not exist");
-                }
-                functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, file));
-            }
-        }
-        startLocalRun(functionDetails, parallelism,
-                instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
-                AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthPlugin)
-                        .clientAuthenticationParameters(clientAuthParams).useTls(useTls)
-                        .tlsAllowInsecureConnection(tlsAllowInsecureConnection)
-                        .tlsHostnameVerificationEnable(tlsHostNameVerificationEnabled)
-                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
-                userCodeFile);
-    }
-
-    protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.FunctionDetails functionDetails,
-                                        int parallelism, int instanceIdOffset, String brokerServiceUrl, String stateStorageServiceUrl, AuthenticationConfig authConfig,
-                                        String userCodeFile)
-            throws Exception {
-
-        String serviceUrl = DEFAULT_SERVICE_URL;
-        if (brokerServiceUrl != null) {
-            serviceUrl = brokerServiceUrl;
-        }
-
-        try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
-                serviceUrl,
-                stateStorageServiceUrl,
-                authConfig,
-                null, /* java instance jar file */
-                null, /* python instance file */
-                null, /* log directory */
-                null, /* extra dependencies dir */
-                new DefaultSecretsProviderConfigurator(), false)) {
-            List<RuntimeSpawner> spawners = new LinkedList<>();
-            for (int i = 0; i < parallelism; ++i) {
-                InstanceConfig instanceConfig = new InstanceConfig();
-                instanceConfig.setFunctionDetails(functionDetails);
-                // TODO: correctly implement function version and id
-                instanceConfig.setFunctionVersion(UUID.randomUUID().toString());
-                instanceConfig.setFunctionId(UUID.randomUUID().toString());
-                instanceConfig.setInstanceId(i + instanceIdOffset);
-                instanceConfig.setMaxBufferedTuples(1024);
-                instanceConfig.setPort(FunctionCommon.findAvailablePort());
-                instanceConfig.setClusterName("local");
-                RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
-                        instanceConfig,
-                        userCodeFile,
-                        null,
-                        containerFactory,
-                        30000);
-                spawners.add(runtimeSpawner);
-                runtimeSpawner.start();
-            }
-            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
-                public void run() {
-                    log.info("Shutting down the localrun runtimeSpawner ...");
-                    for (RuntimeSpawner spawner : spawners) {
-                        spawner.close();
-                    }
-                }
-            });
-            Timer statusCheckTimer = new Timer();
-            statusCheckTimer.scheduleAtFixedRate(new TimerTask() {
-                @Override
-                public void run() {
-                    CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
-                    int index = 0;
-                    for (RuntimeSpawner spawner : spawners) {
-                        futures[index] = spawner.getFunctionStatusAsJson(index);
-                        index++;
-                    }
-                    try {
-                        CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
-                        for (index = 0; index < futures.length; ++index) {
-                            String json = futures[index].get();
-                            Gson gson = new GsonBuilder().setPrettyPrinting().create();
-                            log.info(gson.toJson(new JsonParser().parse(json)));
-                        }
-                    } catch (Exception ex) {
-                        log.error("Could not get status from all local instances");
-                    }
-                }
-            }, 30000, 30000);
-            java.lang.Runtime.getRuntime().addShutdownHook(new Thread() {
-                public void run() {
-                    statusCheckTimer.cancel();
-                }
-            });
-            for (RuntimeSpawner spawner : spawners) {
-                spawner.join();
-                log.info("RuntimeSpawner quit because of", spawner.getRuntime().getDeathException());
-            }
-
-        }
-    }
-
-    private String isBuiltInSource(String sourceType) throws IOException {
-        // Validate the connector source type from the locally available connectors
-        Connectors connectors = getConnectors();
-
-        if (connectors.getSources().containsKey(sourceType)) {
-            // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
-            return connectors.getSources().get(sourceType).toString();
-        } else {
-            return null;
-        }
-    }
-
-    private String isBuiltInSink(String sinkType) throws IOException {
-        // Validate the connector source type from the locally available connectors
-        Connectors connectors = getConnectors();
-
-        if (connectors.getSinks().containsKey(sinkType)) {
-            // Source type is a valid built-in connector type. For local-run we'll fill it up with its own archive path
-            return connectors.getSinks().get(sinkType).toString();
-        } else {
-            return null;
-        }
-    }
-
-    private Connectors getConnectors() throws IOException {
-        // Validate the connector source type from the locally available connectors
-        String pulsarHome = System.getenv("PULSAR_HOME");
-        if (pulsarHome == null) {
-            pulsarHome = Paths.get("").toAbsolutePath().toString();
-        }
-        String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
-        return ConnectorUtils.searchForConnectors(connectorsDir);
-    }
-}