You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/01 18:48:56 UTC

[GitHub] srkukarni closed pull request #1950: Enable Pulsar Functions to be deployed on a kubernetes cluster

srkukarni closed pull request #1950: Enable Pulsar Functions to be deployed on a kubernetes cluster
URL: https://github.com/apache/pulsar/pull/1950
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 0c7b8af695..ba892ad449 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -36,6 +36,8 @@ downloadDirectory: /tmp/pulsar_functions
 #  threadGroupName: "Thread Function Container Group"
 processContainerFactory:
   logDirectory:
+#kubernetesContainerFactory:
+#  k8Uri:
 
 schedulerClassName: "org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler"
 functionAssignmentTopicName: "assignments"
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 5f0f07b680..2ce32cb72e 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -450,6 +450,15 @@ The Apache Software License, Version 2.0
     - org.xerial.snappy-snappy-java-1.1.1.3.jar
   * Objenesis
     - org.objenesis-objenesis-2.1.jar
+  * Squareup
+    - com.squareup.okhttp-logging-interceptor-2.7.5.jar
+    - com.squareup.okhttp-okhttp-ws-2.7.5.jar
+  * Kubernetes Client
+    - io.kubernetes-client-java-2.0.0.jar
+    - io.kubernetes-client-java-api-2.0.0.jar
+    - io.kubernetes-client-java-proto-2.0.0.jar
+  * Joda Time
+    - joda-time-joda-time-2.9.3.jar
 
 
 BSD 3-clause "New" or "Revised" License
@@ -523,6 +532,7 @@ Bouncy Castle License
  * Bouncy Castle -- licenses/LICENSE-bouncycastle.txt
     - org.bouncycastle-bcpkix-jdk15on-1.55.jar
     - org.bouncycastle-bcprov-jdk15on-1.55.jar
+    - org.bouncycastle-bcprov-ext-jdk15on-1.59.jar
 
 ------------------------
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 395e8817f9..9e3c6e8ece 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -1264,6 +1264,7 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
                 RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
                         instanceConfig,
                         userCodeFile,
+                        null,
                         containerFactory,
                         30000);
                 spawners.add(runtimeSpawner);
@@ -1284,7 +1285,8 @@ public void run() {
                         CompletableFuture<String>[] futures = new CompletableFuture[spawners.size()];
                         int index = 0;
                         for (RuntimeSpawner spawner : spawners) {
-                            futures[index++] = spawner.getFunctionStatusAsJson();
+                            futures[index] = spawner.getFunctionStatusAsJson(index);
+                            index++;
                         }
                         try {
                             CompletableFuture.allOf(futures).get(5, TimeUnit.SECONDS);
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 8c4be079c8..728df0ee1c 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -55,6 +55,19 @@
       <artifactId>jcommander</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.kubernetes</groupId>
+      <artifactId>client-java</artifactId>
+      <version>2.0.0</version>
+      <scope>compile</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>ch.qos.logback</groupId>
+          <artifactId>logback-classic</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index c18eff55b0..affa1198cc 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -126,6 +126,7 @@ public void start() throws Exception {
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
+                null, // we really dont use this in thread container
                 containerFactory,
                 expectedHealthCheckInterval * 1000);
 
@@ -218,7 +219,7 @@ public InstanceControlImpl(RuntimeSpawner runtimeSpawner) {
         @Override
         public void getFunctionStatus(Empty request, StreamObserver<InstanceCommunication.FunctionStatus> responseObserver) {
             try {
-                InstanceCommunication.FunctionStatus response = runtimeSpawner.getFunctionStatus().get();
+                InstanceCommunication.FunctionStatus response = runtimeSpawner.getFunctionStatus(runtimeSpawner.getInstanceConfig().getInstanceId()).get();
                 responseObserver.onNext(response);
                 responseObserver.onCompleted();
             } catch (Exception e) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
new file mode 100644
index 0000000000..0573d88012
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -0,0 +1,551 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.Empty;
+import com.squareup.okhttp.Response;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.kubernetes.client.apis.AppsV1Api;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.models.*;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+
+/**
+ * Kubernetes based runtime for running functions.
+ * This runtime provides the usual methods to start/stop/getfunctionstatus
+ * interfaces to control the kubernetes job running function.
+ * We first create a headless service and then a statefulset for starting function pods
+ * Each function instance runs as a pod itself. The reason using statefulset as opposed
+ * to a regular deployment is that functions require a unique instance_id for each instance.
+ * The service abstraction is used for getting functionstatus.
+ */
+@Slf4j
+class KubernetesRuntime implements Runtime {
+
+    private static final String ENV_SHARD_ID = "SHARD_ID";
+    private static final int maxJobNameSize = 63;
+    private static final Integer GRPC_PORT = 9093;
+    public static final Pattern VALID_POD_NAME_REGEX =
+            Pattern.compile("[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*",
+                    Pattern.CASE_INSENSITIVE);
+
+    private final AppsV1Api appsClient;
+    private final CoreV1Api coreClient;
+    static final List<String> TOLERATIONS = Collections.unmodifiableList(
+            Arrays.asList(
+                    "node.kubernetes.io/not-ready",
+                    "node.alpha.kubernetes.io/notReady",
+                    "node.alpha.kubernetes.io/unreachable"
+            )
+    );
+
+    // The thread that invokes the function
+    @Getter
+    private List<String> processArgs;
+    @Getter
+    private ManagedChannel[] channel;
+    private InstanceControlGrpc.InstanceControlFutureStub[] stub;
+    private InstanceConfig instanceConfig;
+    private final String jobNamespace;
+    private final String pulsarDockerImageName;
+    private final String pulsarRootDir;
+    private final String userCodePkgUrl;
+    private final String originalCodeFileName;
+    private final String pulsarAdminUrl;
+    private boolean running;
+
+
+    KubernetesRuntime(AppsV1Api appsClient,
+                      CoreV1Api coreClient,
+                      String jobNamespace,
+                      String pulsarDockerImageName,
+                      String pulsarRootDir,
+                      InstanceConfig instanceConfig,
+                      String instanceFile,
+                      String logDirectory,
+                      String userCodePkgUrl,
+                      String originalCodeFileName,
+                      String pulsarServiceUrl,
+                      String pulsarAdminUrl,
+                      String stateStorageServiceUrl,
+                      AuthenticationConfig authConfig) throws Exception {
+        this.appsClient = appsClient;
+        this.coreClient = coreClient;
+        this.instanceConfig = instanceConfig;
+        this.jobNamespace = jobNamespace;
+        this.pulsarDockerImageName = pulsarDockerImageName;
+        this.pulsarRootDir = pulsarRootDir;
+        this.userCodePkgUrl = userCodePkgUrl;
+        this.originalCodeFileName = originalCodeFileName;
+        this.pulsarAdminUrl = pulsarAdminUrl;
+        this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
+                authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, "conf/log4j2.yaml");
+        running = false;
+        doChecks(instanceConfig.getFunctionDetails());
+    }
+
+    /**
+     * The core logic that creates a service first followed by statefulset
+     */
+    @Override
+    public void start() throws Exception {
+        submitService();
+        try {
+            submitStatefulSet();
+        } catch (Exception e) {
+            deleteService();
+        }
+        running = true;
+        if (channel == null && stub == null) {
+            channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
+            stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
+            for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
+                String address = createJobName(instanceConfig.getFunctionDetails()) + "-" +
+                        i + "." + createJobName(instanceConfig.getFunctionDetails());
+                channel[i] = ManagedChannelBuilder.forAddress(address, GRPC_PORT)
+                        .usePlaintext(true)
+                        .build();
+                stub[i] = InstanceControlGrpc.newFutureStub(channel[i]);
+            }
+        }
+    }
+
+    @Override
+    public void join() throws Exception {
+        // K8 functions never return
+        this.wait();
+    }
+
+    @Override
+    public void stop() throws Exception {
+        if (running) {
+            deleteStatefulSet();
+            deleteService();
+        }
+        if (channel != null) {
+            for (ManagedChannel cn : channel) {
+                cn.shutdown();
+            }
+        }
+        channel = null;
+        stub = null;
+        running = false;
+    }
+
+    @Override
+    public Throwable getDeathException() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
+        CompletableFuture<FunctionStatus> retval = new CompletableFuture<>();
+        if (instanceId < 0 || instanceId >= stub.length) {
+            if (stub == null) {
+                retval.completeExceptionally(new RuntimeException("Invalid InstanceId"));
+                return retval;
+            }
+        }
+        if (stub == null) {
+            retval.completeExceptionally(new RuntimeException("Not alive"));
+            return retval;
+        }
+        ListenableFuture<FunctionStatus> response = stub[instanceId].getFunctionStatus(Empty.newBuilder().build());
+        Futures.addCallback(response, new FutureCallback<FunctionStatus>() {
+            @Override
+            public void onFailure(Throwable throwable) {
+                FunctionStatus.Builder builder = FunctionStatus.newBuilder();
+                builder.setRunning(false);
+                builder.setFailureException(throwable.getMessage());
+                retval.complete(builder.build());
+            }
+
+            @Override
+            public void onSuccess(FunctionStatus t) {
+                retval.complete(t);
+            }
+        });
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getAndReset metrics via rest"));
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<Void> resetMetrics() {
+        CompletableFuture<Void> retval = new CompletableFuture<>();
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support resetting metrics via rest"));
+        return retval;
+    }
+
+    @Override
+    public CompletableFuture<InstanceCommunication.MetricsData> getMetrics() {
+        CompletableFuture<InstanceCommunication.MetricsData> retval = new CompletableFuture<>();
+        retval.completeExceptionally(new RuntimeException("Kubernetes Runtime doesnt support getting metrics via rest"));
+        return retval;
+    }
+
+    @Override
+    public boolean isAlive() {
+        return running;
+    }
+
+    private void submitService() throws Exception {
+        final V1Service service = createService();
+        log.info("Submitting the following service to k8 {}", coreClient.getApiClient().getJSON().serialize(service));
+
+        final Response response =
+                coreClient.createNamespacedServiceCall(jobNamespace, service, null,
+                        null, null).execute();
+        if (!response.isSuccessful()) {
+            if (response.code() == HTTP_CONFLICT) {
+                log.warn("Service already created for function {}/{}/{}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName());
+            } else {
+                log.error("Error creating Service for function {}/{}/{}:- {}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        response.message());
+                // construct a message based on the k8s api server response
+                throw new IllegalStateException(response.message());
+            }
+        } else {
+            log.info("Service Created Successfully for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    private V1Service createService() {
+        final String jobName = createJobName(instanceConfig.getFunctionDetails());
+
+        final V1Service service = new V1Service();
+
+        // setup stateful set metadata
+        final V1ObjectMeta objectMeta = new V1ObjectMeta();
+        objectMeta.name(jobName);
+        service.metadata(objectMeta);
+
+        // create the stateful set spec
+        final V1ServiceSpec serviceSpec = new V1ServiceSpec();
+
+        serviceSpec.clusterIP("None");
+
+        final V1ServicePort servicePort = new V1ServicePort();
+        servicePort.name("grpc").port(GRPC_PORT).protocol("TCP");
+        serviceSpec.addPortsItem(servicePort);
+
+        serviceSpec.selector(getLabels(instanceConfig.getFunctionDetails()));
+
+        service.spec(serviceSpec);
+
+        return service;
+    }
+
+    private void submitStatefulSet() throws Exception {
+        final V1StatefulSet statefulSet = createStatefulSet();
+
+        log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
+
+        final Response response =
+                appsClient.createNamespacedStatefulSetCall(jobNamespace, statefulSet, null,
+                        null, null).execute();
+        if (!response.isSuccessful()) {
+            if (response.code() == HTTP_CONFLICT) {
+                log.warn("Statefulset already present for function {}/{}/{}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName());
+            } else {
+                log.error("Error creating statefulset for function {}/{}/{}:- {}",
+                        instanceConfig.getFunctionDetails().getTenant(),
+                        instanceConfig.getFunctionDetails().getNamespace(),
+                        instanceConfig.getFunctionDetails().getName(),
+                        response.message());
+                // construct a message based on the k8s api server response
+                throw new IllegalStateException(response.message());
+            }
+        } else {
+            log.info("Successfully created statefulset for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    public void deleteStatefulSet() throws Exception {
+        final V1DeleteOptions options = new V1DeleteOptions();
+        options.setGracePeriodSeconds(0L);
+        options.setPropagationPolicy("Foreground");
+        final Response response = appsClient.deleteNamespacedStatefulSetCall(
+                createJobName(instanceConfig.getFunctionDetails()),
+                jobNamespace, options, null, null, null, null, null, null)
+                .execute();
+
+        if (!response.isSuccessful()) {
+            throw new RuntimeException(String.format("Error deleting statefulset for function {}/{}/{} :- {} ",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(),
+                    response.message()));
+        } else {
+            log.info("Successfully deleted statefulset for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    public void deleteService() throws Exception {
+        final V1DeleteOptions options = new V1DeleteOptions();
+        options.setGracePeriodSeconds(0L);
+        options.setPropagationPolicy("Foreground");
+        final Response response = coreClient.deleteNamespacedServiceCall(
+                createJobName(instanceConfig.getFunctionDetails()),
+                jobNamespace, options, null, null, null, null, null, null)
+                .execute();
+
+        if (!response.isSuccessful()) {
+            throw new RuntimeException(String.format("Error deleting service for function {}/{}/{} :- {}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(),
+                    response.message()));
+        } else {
+            log.info("Service deleted successfully for function {}/{}/{}",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName());
+        }
+    }
+
+    protected List<String> getExecutorCommand() {
+        return Arrays.asList(
+                "sh",
+                "-c",
+                String.join(" ", getDownloadCommand(userCodePkgUrl, originalCodeFileName))
+                        + " && " + setShardIdEnvironmentVariableCommand()
+                        + " && " + String.join(" ", processArgs)
+        );
+    }
+
+    private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
+        return Arrays.asList(
+                pulsarRootDir + "/bin/pulsar-admin",
+                "--admin-url",
+                pulsarAdminUrl,
+                "functions",
+                "download",
+                "--path",
+                bkPath,
+                "--destination-file",
+                userCodeFilePath);
+    }
+
+    private static String setShardIdEnvironmentVariableCommand() {
+        return String.format("%s=${POD_NAME##*-} && echo shardId=${%s}", ENV_SHARD_ID, ENV_SHARD_ID);
+    }
+
+
+    private V1StatefulSet createStatefulSet() {
+        final String jobName = createJobName(instanceConfig.getFunctionDetails());
+
+        final V1StatefulSet statefulSet = new V1StatefulSet();
+
+        // setup stateful set metadata
+        final V1ObjectMeta objectMeta = new V1ObjectMeta();
+        objectMeta.name(jobName);
+        statefulSet.metadata(objectMeta);
+
+        // create the stateful set spec
+        final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec();
+        statefulSetSpec.serviceName(jobName);
+        statefulSetSpec.setReplicas(instanceConfig.getFunctionDetails().getParallelism());
+
+        // Parallel pod management tells the StatefulSet controller to launch or terminate
+        // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely
+        // terminated prior to launching or terminating another Pod.
+        statefulSetSpec.setPodManagementPolicy("Parallel");
+
+        // add selector match labels
+        // so the we know which pods to manage
+        final V1LabelSelector selector = new V1LabelSelector();
+        selector.matchLabels(getLabels(instanceConfig.getFunctionDetails()));
+        statefulSetSpec.selector(selector);
+
+        // create a pod template
+        final V1PodTemplateSpec podTemplateSpec = new V1PodTemplateSpec();
+
+        // set up pod meta
+        final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getLabels(instanceConfig.getFunctionDetails()));
+        /*
+        TODO:- Figure out the metrics collection later.
+        templateMetaData.annotations(getPrometheusAnnotations());
+        */
+        podTemplateSpec.setMetadata(templateMetaData);
+
+        final List<String> command = getExecutorCommand();
+        podTemplateSpec.spec(getPodSpec(command, instanceConfig.getFunctionDetails().hasResources() ? instanceConfig.getFunctionDetails().getResources() : null));
+
+        statefulSetSpec.setTemplate(podTemplateSpec);
+
+        statefulSet.spec(statefulSetSpec);
+
+        return statefulSet;
+    }
+
+    private Map<String, String> getPrometheusAnnotations() {
+        final Map<String, String> annotations = new HashMap<>();
+        annotations.put("prometheus.io/scrape", "true");
+        annotations.put("prometheus.io/port", "8080");
+        return annotations;
+    }
+
+    private Map<String, String> getLabels(Function.FunctionDetails functionDetails) {
+        final Map<String, String> labels = new HashMap<>();
+        labels.put("app", createJobName(functionDetails));
+        labels.put("namespace", functionDetails.getNamespace());
+        labels.put("tenant", functionDetails.getTenant());
+        return labels;
+    }
+
+    private V1PodSpec getPodSpec(List<String> instanceCommand, Function.Resources resource) {
+        final V1PodSpec podSpec = new V1PodSpec();
+
+        // set the termination period to 0 so pods can be deleted quickly
+        podSpec.setTerminationGracePeriodSeconds(0L);
+
+        // set the pod tolerations so pods are rescheduled when nodes go down
+        // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
+        podSpec.setTolerations(getTolerations());
+
+        podSpec.containers(Collections.singletonList(
+                getContainer(instanceCommand, resource)));
+
+        return podSpec;
+    }
+
+    private List<V1Toleration> getTolerations() {
+        final List<V1Toleration> tolerations = new ArrayList<>();
+        TOLERATIONS.forEach(t -> {
+            final V1Toleration toleration =
+                    new V1Toleration()
+                            .key(t)
+                            .operator("Exists")
+                            .effect("NoExecute")
+                            .tolerationSeconds(10L);
+            tolerations.add(toleration);
+        });
+
+        return tolerations;
+    }
+
+    private V1Container getContainer(List<String> instanceCommand, Function.Resources resource) {
+        final V1Container container = new V1Container().name("pulsarfunction");
+
+        // set up the container images
+        container.setImage(pulsarDockerImageName);
+
+        // set up the container command
+        container.setCommand(instanceCommand);
+
+        // setup the environment variables for the container
+        final V1EnvVar envVarPodName = new V1EnvVar();
+        envVarPodName.name("POD_NAME")
+                .valueFrom(new V1EnvVarSource()
+                        .fieldRef(new V1ObjectFieldSelector()
+                                .fieldPath("metadata.name")));
+        container.setEnv(Arrays.asList(envVarPodName));
+
+
+        // set container resources
+        final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
+        final Map<String, Quantity> requests = new HashMap<>();
+        requests.put("memory", Quantity.fromString(Long.toString(resource != null && resource.getRam() != 0 ? resource.getRam() : 1073741824)));
+        requests.put("cpu", Quantity.fromString(Double.toString(resource != null && resource.getCpu() != 0 ? resource.getCpu() : 1)));
+        resourceRequirements.setRequests(requests);
+        container.setResources(resourceRequirements);
+
+        // set container ports
+        container.setPorts(getContainerPorts());
+
+        return container;
+    }
+
+    private List<V1ContainerPort> getContainerPorts() {
+        List<V1ContainerPort> ports = new ArrayList<>();
+        final V1ContainerPort port = new V1ContainerPort();
+        port.setName("grpc");
+        port.setContainerPort(GRPC_PORT);
+        ports.add(port);
+        return ports;
+    }
+
+    private static String createJobName(Function.FunctionDetails functionDetails) {
+        return createJobName(functionDetails.getTenant(),
+                functionDetails.getNamespace(),
+                functionDetails.getName());
+    }
+
+    private static String createJobName(String tenant, String namespace, String functionName) {
+        return "pf-" + tenant + "-" + namespace + "-" + functionName;
+    }
+
+    private static void doChecks(Function.FunctionDetails functionDetails) {
+        final String jobName = createJobName(functionDetails);
+        if (!jobName.equals(jobName.toLowerCase())) {
+            throw new RuntimeException("Kubernetes does not allow upper case jobNames.");
+        }
+        final Matcher matcher = VALID_POD_NAME_REGEX.matcher(jobName);
+        if (!matcher.matches()) {
+            throw new RuntimeException("Kubernetes only admits lower case and numbers.");
+        }
+        if (jobName.length() > maxJobNameSize) {
+            throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
+        }
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
new file mode 100644
index 0000000000..c55935b0fe
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.kubernetes.client.ApiClient;
+import io.kubernetes.client.Configuration;
+import io.kubernetes.client.apis.AppsV1Api;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.util.Config;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+/**
+ * Kubernetes based function container factory implementation.
+ */
+@Slf4j
+public class KubernetesRuntimeFactory implements RuntimeFactory {
+
+    private final String k8Uri;
+    private final String jobNamespace;
+    private final String pulsarDockerImageName;
+    private final String pulsarRootDir;
+    private final Boolean submittingInsidePod;
+    private final String pulsarAdminUri;
+    private final String pulsarServiceUri;
+    private final String stateStorageServiceUri;
+    private final AuthenticationConfig authConfig;
+    private final String javaInstanceJarFile;
+    private final String pythonInstanceFile;
+    private final String logDirectory = "logs/functions";
+    private AppsV1Api appsClient;
+    private CoreV1Api coreClient;
+
+    @VisibleForTesting
+    public KubernetesRuntimeFactory(String k8Uri,
+                                    String jobNamespace,
+                                    String pulsarDockerImageName,
+                                    String pulsarRootDir,
+                                    Boolean submittingInsidePod,
+                                    String pulsarServiceUri,
+                                    String pulsarAdminUri,
+                                    String stateStorageServiceUri,
+                                    AuthenticationConfig authConfig) {
+        this.k8Uri = k8Uri;
+        if (!isEmpty(jobNamespace)) {
+            this.jobNamespace = jobNamespace;
+        } else {
+            this.jobNamespace = "default";
+        }
+        if (!isEmpty(pulsarDockerImageName)) {
+            this.pulsarDockerImageName = pulsarDockerImageName;
+        } else {
+            this.pulsarDockerImageName = "apachepulsar/pulsar";
+        }
+        if (!isEmpty(pulsarRootDir)) {
+            this.pulsarRootDir = pulsarRootDir;
+        } else {
+            this.pulsarRootDir = "/pulsar";
+        }
+        this.submittingInsidePod = submittingInsidePod;
+        this.pulsarServiceUri = pulsarServiceUri;
+        this.pulsarAdminUri = pulsarAdminUri;
+        this.stateStorageServiceUri = stateStorageServiceUri;
+        this.authConfig = authConfig;
+        this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
+        this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+    }
+
+    @Override
+    public boolean externallyManaged() {
+        return true;
+    }
+
+    @Override
+    public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
+                                             String originalCodeFileName,
+                                             Long expectedHealthCheckInterval) throws Exception {
+        setupClient();
+        String instanceFile;
+        switch (instanceConfig.getFunctionDetails().getRuntime()) {
+            case JAVA:
+                instanceFile = javaInstanceJarFile;
+                break;
+            case PYTHON:
+                instanceFile = pythonInstanceFile;
+                break;
+            default:
+                throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
+        }
+        return new KubernetesRuntime(
+            appsClient,
+            coreClient,
+            jobNamespace,
+            pulsarDockerImageName,
+            pulsarRootDir,
+            instanceConfig,
+            instanceFile,
+            logDirectory,
+            codePkgUrl,
+            originalCodeFileName,
+            pulsarServiceUri,
+            pulsarAdminUri,
+            stateStorageServiceUri,
+            authConfig);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    private void setupClient() throws Exception {
+        if (appsClient == null) {
+            if (k8Uri == null) {
+                log.info("k8Uri is null thus going by defaults");
+                ApiClient cli;
+                if (submittingInsidePod) {
+                    log.info("Looks like we are inside a k8 pod ourselves. Initializing as cluster");
+                    cli = Config.fromCluster();
+                } else {
+                    log.info("Using default cluster since we are not running inside k8");
+                    cli = Config.defaultClient();
+                }
+                Configuration.setDefaultApiClient(cli);
+                appsClient = new AppsV1Api();
+                coreClient = new CoreV1Api();
+            } else {
+                log.info("Setting up k8Client using uri " + k8Uri);
+                final ApiClient apiClient = new ApiClient().setBasePath(k8Uri);
+                appsClient = new AppsV1Api(apiClient);
+                coreClient = new CoreV1Api(apiClient);
+            }
+        }
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 63f6f3f0a8..2146376d7a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -23,22 +23,17 @@
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.protobuf.Empty;
-import com.google.protobuf.util.JsonFormat;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.io.InputStream;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
@@ -46,8 +41,6 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
 /**
  * A function container implemented using java thread.
  */
@@ -66,6 +59,7 @@
     private InstanceControlGrpc.InstanceControlFutureStub stub;
     private ScheduledExecutorService timer;
     private InstanceConfig instanceConfig;
+    private final Long expectedHealthCheckInterval;
 
     ProcessRuntime(InstanceConfig instanceConfig,
                    String instanceFile,
@@ -77,101 +71,10 @@
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
-        this.processArgs = composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
-                authConfig, expectedHealthCheckInterval);
-    }
-
-    private List<String> composeArgs(InstanceConfig instanceConfig,
-                                     String instanceFile,
-                                     String logDirectory,
-                                     String codeFile,
-                                     String pulsarServiceUrl,
-                                     String stateStorageServiceUrl,
-                                     AuthenticationConfig authConfig,
-                                     Long expectedHealthCheckInterval) throws Exception {
-        List<String> args = new LinkedList<>();
-        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
-            args.add("java");
-            args.add("-cp");
-            args.add(instanceFile);
-
-            // Keep the same env property pointing to the Java instance file so that it can be picked up
-            // by the child process and manually added to classpath
-            args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
-            args.add("-Dlog4j.configurationFile=java_instance_log4j2.yml");
-            args.add("-Dpulsar.function.log.dir=" + String.format(
-                    "%s/%s",
-                    logDirectory,
-                    FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())));
-            args.add("-Dpulsar.function.log.file=" + String.format(
-                    "%s-%s",
-                    instanceConfig.getFunctionDetails().getName(),
-                    instanceConfig.getInstanceId()));
-            if (instanceConfig.getFunctionDetails().getResources() != null) {
-                Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
-                if (resources.getRam() != 0) {
-                    args.add("-Xmx" + String.valueOf(resources.getRam()));
-                }
-            }
-            args.add(JavaInstanceMain.class.getName());
-            args.add("--jar");
-            args.add(codeFile);
-        } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
-            args.add("python");
-            args.add(instanceFile);
-            args.add("--py");
-            args.add(codeFile);
-            args.add("--logging_directory");
-            args.add(logDirectory);
-            args.add("--logging_file");
-            args.add(instanceConfig.getFunctionDetails().getName());
-            // TODO:- Find a platform independent way of controlling memory for a python application
-        }
-        args.add("--instance_id");
-        args.add(instanceConfig.getInstanceName());
-        args.add("--function_id");
-        args.add(instanceConfig.getFunctionId());
-        args.add("--function_version");
-        args.add(instanceConfig.getFunctionVersion());
-        args.add("--function_details");
-        args.add(JsonFormat.printer().print(instanceConfig.getFunctionDetails()));
-
-        args.add("--pulsar_serviceurl");
-        args.add(pulsarServiceUrl);
-        if (authConfig != null) {
-            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
-                    && isNotBlank(authConfig.getClientAuthenticationParameters())) {
-                args.add("--client_auth_plugin");
-                args.add(authConfig.getClientAuthenticationPlugin());
-                args.add("--client_auth_params");
-                args.add(authConfig.getClientAuthenticationParameters());
-            }
-            args.add("--use_tls");
-            args.add(Boolean.toString(authConfig.isUseTls()));
-            args.add("--tls_allow_insecure");
-            args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
-            args.add("--hostname_verification_enabled");
-            args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
-            if(isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
-                args.add("--tls_trust_cert_path");
-                args.add(authConfig.getTlsTrustCertsFilePath());
-            }
-        }
-        args.add("--max_buffered_tuples");
-        args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
-
-        args.add("--port");
-        args.add(String.valueOf(instanceConfig.getPort()));
-
-        // state storage configs
-        if (null != stateStorageServiceUrl
-            && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
-            args.add("--state_storage_serviceurl");
-            args.add(stateStorageServiceUrl);
-        }
-        args.add("--expected_healthcheck_interval");
-        args.add(String.valueOf(expectedHealthCheckInterval));
-        return args;
+        this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+        this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
+                authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
+                "java_instance_log4j2.yml");
     }
 
     /**
@@ -201,7 +104,7 @@ public void run() {
                                 instanceConfig.getInstanceId(), e);
                     }
                 }
-            }, 30000, 30000, TimeUnit.MILLISECONDS);
+            }, expectedHealthCheckInterval, expectedHealthCheckInterval, TimeUnit.SECONDS);
         }
     }
 
@@ -226,7 +129,7 @@ public void stop() {
     }
 
     @Override
-    public CompletableFuture<FunctionStatus> getFunctionStatus() {
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
         CompletableFuture<FunctionStatus> retval = new CompletableFuture<>();
         if (stub == null) {
             retval.completeExceptionally(new RuntimeException("Not alive"));
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 7b910bc295..78b069cb69 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -92,6 +92,7 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
 
     @Override
     public ProcessRuntime createContainer(InstanceConfig instanceConfig, String codeFile,
+                                          String originalcodeFileName,
                                           Long expectedHealthCheckInterval) throws Exception {
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index ea992901f7..ac1eceda7e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -28,17 +28,17 @@
  */
 public interface Runtime {
 
-    void start();
+    void start() throws Exception;
 
     void join() throws Exception;
 
-    void stop();
+    void stop() throws Exception;
 
     boolean isAlive();
 
     Throwable getDeathException();
 
-    CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus();
+    CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId);
 
     CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
     
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index ef2ea9c569..fd8a7bebb7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -33,9 +33,11 @@
      * @return function container to start/stop instance
      */
     Runtime createContainer(
-            InstanceConfig instanceConfig, String codeFile,
+            InstanceConfig instanceConfig, String codeFile, String originalCodeFileName,
             Long expectedHealthCheckInterval) throws Exception;
 
+    default boolean externallyManaged() { return false; }
+
     @Override
     void close();
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
index adb4578109..d4d61e0b1c 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeSpawner.java
@@ -41,9 +41,11 @@
 @Slf4j
 public class RuntimeSpawner implements AutoCloseable {
 
+    @Getter
     private final InstanceConfig instanceConfig;
     private final RuntimeFactory runtimeFactory;
     private final String codeFile;
+    private final String originalCodeFileName;
 
     @Getter
     private Runtime runtime;
@@ -55,10 +57,12 @@
 
     public RuntimeSpawner(InstanceConfig instanceConfig,
                           String codeFile,
+                          String originalCodeFileName,
                           RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
         this.instanceConfig = instanceConfig;
         this.runtimeFactory = containerFactory;
         this.codeFile = codeFile;
+        this.originalCodeFileName = originalCodeFileName;
         this.numRestarts = 0;
         this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
     }
@@ -68,12 +72,12 @@ public void start() throws Exception {
         log.info("{}/{}/{}-{} RuntimeSpawner starting function", details.getTenant(), details.getNamespace(),
                 details.getName(), this.instanceConfig.getInstanceId());
 
-        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile,
+        runtime = runtimeFactory.createContainer(this.instanceConfig, codeFile, originalCodeFileName,
                 instanceLivenessCheckFreqMs * 1000);
         runtime.start();
 
         // monitor function runtime to make sure it is running.  If not, restart the function runtime
-        if (instanceLivenessCheckFreqMs > 0) {
+        if (!runtimeFactory.externallyManaged() && instanceLivenessCheckFreqMs > 0) {
             processLivenessCheckTimer = new Timer();
             processLivenessCheckTimer.scheduleAtFixedRate(new TimerTask() {
                 @Override
@@ -82,9 +86,14 @@ public void run() {
                         log.error("{}/{}/{}-{} Function Container is dead with exception.. restarting", details.getTenant(),
                                 details.getNamespace(), details.getName(), runtime.getDeathException());
                         // Just for the sake of sanity, just destroy the runtime
-                        runtime.stop();
-                        runtimeDeathException = runtime.getDeathException();
-                        runtime.start();
+                        try {
+                            runtime.stop();
+                            runtimeDeathException = runtime.getDeathException();
+                            runtime.start();
+                        } catch (Exception e) {
+                            log.error("{}/{}/{}-{} Function Restart failed", details.getTenant(),
+                                    details.getNamespace(), details.getName(), e);
+                        }
                         numRestarts++;
                     }
                 }
@@ -98,10 +107,10 @@ public void join() throws Exception {
         }
     }
 
-    public CompletableFuture<FunctionStatus> getFunctionStatus() {
-        return runtime.getFunctionStatus().thenApply(f -> {
-            FunctionStatus.Builder builder = FunctionStatus.newBuilder();
-            builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(instanceConfig.getInstanceName());
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
+        return runtime.getFunctionStatus(instanceId).thenApply(f -> {
+           FunctionStatus.Builder builder = FunctionStatus.newBuilder();
+           builder.mergeFrom(f).setNumRestarts(numRestarts).setInstanceId(String.valueOf(instanceId));
             if (!f.getRunning() && runtimeDeathException != null) {
                 builder.setFailureException(runtimeDeathException.getMessage());
             }
@@ -109,8 +118,8 @@ public void join() throws Exception {
         });
     }
 
-    public CompletableFuture<String> getFunctionStatusAsJson() {
-        return this.getFunctionStatus().thenApply(msg -> {
+    public CompletableFuture<String> getFunctionStatusAsJson(int instanceId) {
+        return this.getFunctionStatus(instanceId).thenApply(msg -> {
             try {
                 return Utils.printJson(msg);
             } catch (IOException e) {
@@ -123,7 +132,11 @@ public void join() throws Exception {
     @Override
     public void close() {
         if (null != runtime) {
-            runtime.stop();
+            try {
+                runtime.stop();
+            } catch (Exception e) {
+                // Ignore
+            }
             runtime = null;
         }
         if (processLivenessCheckTimer != null) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
new file mode 100644
index 0000000000..fe2a88ee8e
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.protobuf.util.JsonFormat;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+
+import java.util.*;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
+/**
+ * Util class for common runtime functionality
+ */
+@Slf4j
+class RuntimeUtils {
+
+    public static List<String> composeArgs(InstanceConfig instanceConfig,
+                                           String instanceFile,
+                                           String logDirectory,
+                                           String originalCodeFileName,
+                                           String pulsarServiceUrl,
+                                           String stateStorageServiceUrl,
+                                           AuthenticationConfig authConfig,
+                                           String shardId,
+                                           Integer grpcPort,
+                                           Long expectedHealthCheckInterval,
+                                           String javaLog4jFileName) throws Exception {
+        List<String> args = new LinkedList<>();
+        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            args.add("java");
+            args.add("-cp");
+            args.add(instanceFile);
+
+            // Keep the same env property pointing to the Java instance file so that it can be picked up
+            // by the child process and manually added to classpath
+            args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
+            args.add("-Dlog4j.configurationFile=" + javaLog4jFileName);
+            args.add("-Dpulsar.function.log.dir=" + String.format(
+                    "%s/%s",
+                    logDirectory,
+                    FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())));
+            args.add("-Dpulsar.function.log.file=" + String.format(
+                    "%s-%s",
+                    instanceConfig.getFunctionDetails().getName(),
+                    shardId));
+            if (instanceConfig.getFunctionDetails().getResources() != null) {
+                Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
+                if (resources.getRam() != 0) {
+                    args.add("-Xmx" + String.valueOf(resources.getRam()));
+                }
+            }
+            args.add(JavaInstanceMain.class.getName());
+            args.add("--jar");
+            args.add(originalCodeFileName);
+        } else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
+            args.add("python");
+            args.add(instanceFile);
+            args.add("--py");
+            args.add(originalCodeFileName);
+            args.add("--logging_directory");
+            args.add(logDirectory);
+            args.add("--logging_file");
+            args.add(instanceConfig.getFunctionDetails().getName());
+            // TODO:- Find a platform independent way of controlling memory for a python application
+        }
+        args.add("--instance_id");
+        args.add(shardId);
+        args.add("--function_id");
+        args.add(instanceConfig.getFunctionId());
+        args.add("--function_version");
+        args.add(instanceConfig.getFunctionVersion());
+        args.add("--function_details");
+        args.add("'" + JsonFormat.printer().omittingInsignificantWhitespace().print(instanceConfig.getFunctionDetails()) + "'");
+
+        args.add("--pulsar_serviceurl");
+        args.add(pulsarServiceUrl);
+        if (authConfig != null) {
+            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                    && isNotBlank(authConfig.getClientAuthenticationParameters())) {
+                args.add("--client_auth_plugin");
+                args.add(authConfig.getClientAuthenticationPlugin());
+                args.add("--client_auth_params");
+                args.add(authConfig.getClientAuthenticationParameters());
+            }
+            args.add("--use_tls");
+            args.add(Boolean.toString(authConfig.isUseTls()));
+            args.add("--tls_allow_insecure");
+            args.add(Boolean.toString(authConfig.isTlsAllowInsecureConnection()));
+            args.add("--hostname_verification_enabled");
+            args.add(Boolean.toString(authConfig.isTlsHostnameVerificationEnable()));
+            if (isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
+                args.add("--tls_trust_cert_path");
+                args.add(authConfig.getTlsTrustCertsFilePath());
+            }
+        }
+        args.add("--max_buffered_tuples");
+        args.add(String.valueOf(instanceConfig.getMaxBufferedTuples()));
+
+        args.add("--port");
+        args.add(String.valueOf(grpcPort));
+
+        // state storage configs
+        if (null != stateStorageServiceUrl
+                && instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
+            args.add("--state_storage_serviceurl");
+            args.add(stateStorageServiceUrl);
+        }
+        args.add("--expected_healthcheck_interval");
+        args.add(String.valueOf(expectedHealthCheckInterval));
+        return args;
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 6e8a393b8d..05cb87ff80 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -99,7 +99,7 @@ public void stop() {
     }
 
     @Override
-    public CompletableFuture<FunctionStatus> getFunctionStatus() {
+    public CompletableFuture<FunctionStatus> getFunctionStatus(int instanceId) {
         CompletableFuture<FunctionStatus> statsFuture = new CompletableFuture<>();
         if (!isAlive()) {
             FunctionStatus.Builder functionStatusBuilder = FunctionStatus.newBuilder();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index 84d21af6ae..dfbbb64efa 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -52,7 +52,7 @@ public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, Str
     }
 
     @VisibleForTesting
-    ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
+    public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
@@ -82,6 +82,7 @@ private static PulsarClient createPulsarClient(String pulsarServiceUrl, Authenti
     
     @Override
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
+                                         String originalCodeFileName,
                                          Long expectedHealthCheckInterval) {
         return new ThreadRuntime(
             instanceConfig,
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
new file mode 100644
index 0000000000..274d973659
--- /dev/null
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.runtime;
+
+import com.google.protobuf.util.JsonFormat;
+import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link ThreadRuntime}.
+ */
+public class KubernetesRuntimeTest {
+
+    private static final String TEST_TENANT = "tenant";
+    private static final String TEST_NAMESPACE = "namespace";
+    private static final String TEST_NAME = "container";
+    private static final Map<String, String> topicsToSerDeClassName = new HashMap<>();
+    private static final Map<String, ConsumerSpec> topicsToSchema = new HashMap<>();
+    static {
+        topicsToSerDeClassName.put("persistent://sample/standalone/ns1/test_src", "");
+        topicsToSchema.put("persistent://sample/standalone/ns1/test_src",
+                ConsumerSpec.newBuilder().setSerdeClassName("").setIsRegexPattern(false).build());
+    }
+
+    private final KubernetesRuntimeFactory factory;
+    private final String userJarFile;
+    private final String javaInstanceJarFile;
+    private final String pythonInstanceFile;
+    private final String pulsarServiceUrl;
+    private final String pulsarAdminUrl;
+    private final String stateStorageServiceUrl;
+    private final String logDirectory;
+
+    public KubernetesRuntimeTest() throws Exception {
+        this.userJarFile = "/Users/user/UserJar.jar";
+        this.javaInstanceJarFile = "/pulsar/instances/java-instance.jar";
+        this.pythonInstanceFile = "/pulsar/instances/python-instance/python_instance_main.py";
+        this.pulsarServiceUrl = "pulsar://localhost:6670";
+        this.pulsarAdminUrl = "http://localhost:8080";
+        this.stateStorageServiceUrl = "bk://localhost:4181";
+        this.logDirectory = "logs/functions";
+        this.factory = new KubernetesRuntimeFactory(null, null, null, null,
+            false, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null);
+    }
+
+    @AfterMethod
+    public void tearDown() {
+        this.factory.close();
+    }
+
+    FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        functionDetailsBuilder.setRuntime(runtime);
+        functionDetailsBuilder.setTenant(TEST_TENANT);
+        functionDetailsBuilder.setNamespace(TEST_NAMESPACE);
+        functionDetailsBuilder.setName(TEST_NAME);
+        functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
+        functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
+                .setTopic(TEST_NAME + "-output")
+                .setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+                .setClassName("org.pulsar.pulsar.TestSink")
+                .setTypeClassName(String.class.getName())
+                .build());
+        functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+        functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
+                .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+                .putAllInputSpecs(topicsToSchema)
+                .setClassName("org.pulsar.pulsar.TestSource")
+                .setTypeClassName(String.class.getName()));
+        return functionDetailsBuilder.build();
+    }
+
+    InstanceConfig createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
+        InstanceConfig config = new InstanceConfig();
+
+        config.setFunctionDetails(createFunctionDetails(runtime));
+        config.setFunctionId(java.util.UUID.randomUUID().toString());
+        config.setFunctionVersion("1.0");
+        config.setInstanceId(0);
+        config.setMaxBufferedTuples(1024);
+
+        return config;
+    }
+
+    @Test
+    public void testJavaConstructor() throws Exception {
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
+
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        List<String> args = container.getProcessArgs();
+        assertEquals(args.size(), 28);
+        String expectedArgs = "java -cp " + javaInstanceJarFile
+                + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+                + " -Dlog4j.configurationFile=conf/log4j2.yaml "
+                + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionDetailsUtils.getFullyQualifiedName(config.getFunctionDetails())
+                + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+                + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+                + " --jar " + userJarFile + " --instance_id "
+                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
+                + " --function_version " + config.getFunctionVersion()
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
+                + " --max_buffered_tuples 1024 --port " + args.get(23)
+                + " --state_storage_serviceurl " + stateStorageServiceUrl
+                + " --expected_healthcheck_interval -1";
+        assertEquals(String.join(" ", args), expectedArgs);
+    }
+
+    @Test
+    public void testPythonConstructor() throws Exception {
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
+
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        List<String> args = container.getProcessArgs();
+        assertEquals(args.size(), 24);
+        String expectedArgs = "python " + pythonInstanceFile
+                + " --py " + userJarFile + " --logging_directory "
+                + logDirectory + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
+                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
+                + " --function_version " + config.getFunctionVersion()
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
+                + " --max_buffered_tuples 1024 --port " + args.get(21)
+                + " --expected_healthcheck_interval -1";
+        assertEquals(String.join(" ", args), expectedArgs);
+    }
+
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 23457834f6..b09b9af492 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -114,7 +114,7 @@ InstanceConfig createJavaInstanceConfig(FunctionDetails.Runtime runtime) {
     public void testJavaConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
         assertEquals(args.size(), 28);
         String expectedArgs = "java -cp " + javaInstanceJarFile
@@ -126,8 +126,8 @@ public void testJavaConstructor() throws Exception {
                 + " --jar " + userJarFile + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
-                + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
-                + " --pulsar_serviceurl " + pulsarServiceUrl
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
                 + " --expected_healthcheck_interval 30";
@@ -138,7 +138,7 @@ public void testJavaConstructor() throws Exception {
     public void testPythonConstructor() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON);
 
-        ProcessRuntime container = factory.createContainer(config, userJarFile, 30l);
+        ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
         assertEquals(args.size(), 24);
         String expectedArgs = "python " + pythonInstanceFile
@@ -146,8 +146,8 @@ public void testPythonConstructor() throws Exception {
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName() + " --instance_id "
                 + config.getInstanceId() + " --function_id " + config.getFunctionId()
                 + " --function_version " + config.getFunctionVersion()
-                + " --function_details " + JsonFormat.printer().print(config.getFunctionDetails())
-                + " --pulsar_serviceurl " + pulsarServiceUrl
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(21)
                 + " --expected_healthcheck_interval 30";
         assertEquals(String.join(" ", args), expectedArgs);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index a3355b0200..52af6897c7 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -138,26 +138,31 @@ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Except
         FunctionDetails.Builder functionDetails = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
         log.info("{}/{}/{}-{} Starting function ...", functionDetails.getTenant(), functionDetails.getNamespace(),
                 functionDetails.getName(), instanceId);
-        File pkgFile = null;
+        String packageFile;
 
         String pkgLocation = functionMetaData.getPackageLocation().getPackagePath();
         boolean isPkgUrlProvided = isFunctionPackageUrlSupported(pkgLocation);
 
         if (isPkgUrlProvided && pkgLocation.startsWith(FILE)) {
             URL url = new URL(pkgLocation);
-            pkgFile = new File(url.toURI());
+            File pkgFile = new File(url.toURI());
+            packageFile = pkgFile.getAbsolutePath();
         } else if (isFunctionCodeBuiltin(functionDetails)) {
-            pkgFile = getBuiltinArchive(functionDetails);
+            File pkgFile = getBuiltinArchive(functionDetails);
+            packageFile = pkgFile.getAbsolutePath();
+        } else if (runtimeFactory.externallyManaged()) {
+            packageFile = pkgLocation;
         } else {
             File pkgDir = new File(
                     workerConfig.getDownloadDirectory(),
                     getDownloadPackagePath(functionMetaData, instanceId));
             pkgDir.mkdirs();
 
-            pkgFile = new File(
+            File pkgFile = new File(
                     pkgDir,
                     new File(FunctionDetailsUtils.getDownloadFileName(functionMetaData.getFunctionDetails(), functionMetaData.getPackageLocation())).getName());
             downloadFile(pkgFile, isPkgUrlProvided, functionMetaData, instanceId);
+            packageFile = pkgFile.getAbsolutePath();
         }
 
         InstanceConfig instanceConfig = new InstanceConfig();
@@ -172,7 +177,8 @@ public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Except
         log.info("{}/{}/{}-{} start process with instance config {}", functionDetails.getTenant(), functionDetails.getNamespace(),
                 functionDetails.getName(), instanceId, instanceConfig);
 
-        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, pkgFile.getAbsolutePath(),
+        RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
+                functionMetaData.getPackageLocation().getOriginalFileName(),
                 runtimeFactory, workerConfig.getInstanceLivenessCheckFreqMs());
 
         functionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index e634130f5e..992e5db23c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.functions.worker;
 
-import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
@@ -32,31 +31,26 @@
 import java.util.stream.Collectors;
 
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.client.Client;
-import javax.ws.rs.client.ClientBuilder;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
-import org.apache.pulsar.functions.runtime.Runtime;
-import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.*;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.runtime.Runtime;
 
 /**
  * This class managers all aspects of functions assignments and running of function assignments for this worker
@@ -85,6 +79,7 @@
 
     private FunctionActioner functionActioner;
 
+    @Getter
     private RuntimeFactory runtimeFactory;
 
     private MembershipManager membershipManager;
@@ -123,8 +118,19 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
                     workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
                     workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
                     workerConfig.getProcessContainerFactory().getLogDirectory());
+        } else if (workerConfig.getKubernetesContainerFactory() != null){
+            this.runtimeFactory = new KubernetesRuntimeFactory(
+                    workerConfig.getKubernetesContainerFactory().getK8Uri(),
+                    workerConfig.getKubernetesContainerFactory().getJobNamespace(),
+                    workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
+                    workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
+                    workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
+                    StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
+                    StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
+                    workerConfig.getStateStorageServiceUrl(),
+                    authConfig);
         } else {
-            throw new RuntimeException("Either Thread or Process Container Factory need to be set");
+            throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
 
         this.actionQueue = new LinkedBlockingQueue<>();
@@ -236,7 +242,12 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) {
      */
     public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String tenant, String namespace,
             String functionName, int instanceId, URI uri) {
-        Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
+        Assignment assignment;
+        if (runtimeFactory.externallyManaged()) {
+            assignment = this.findAssignment(tenant, namespace, functionName, -1);
+        } else {
+            assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
+        }
         final String assignedWorkerId = assignment.getWorkerId();
         final String workerId = this.workerConfig.getWorkerId();
         
@@ -257,7 +268,7 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) {
             if (runtimeSpawner != null) {
                 try {
                     InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus
-                            .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus().get());
+                            .newBuilder(functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get());
                     functionStatusBuilder.setWorkerId(assignedWorkerId);
                     functionStatus = functionStatusBuilder.build();
                 } catch (InterruptedException | ExecutionException e) {
@@ -302,6 +313,10 @@ public synchronized void removeAssignments(Collection<Assignment> assignments) {
 
     public Response stopFunctionInstance(String tenant, String namespace, String functionName, int instanceId,
             boolean restart, URI uri) throws Exception {
+        if (runtimeFactory.externallyManaged()) {
+            return Response.status(Status.NOT_IMPLEMENTED).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData("Externally managed schedulers can't do per instance stop")).build();
+        }
         Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
         final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId);
         if (assignment == null) {
@@ -343,7 +358,8 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
         }
-        for (Assignment assignment : assignments) {
+        if (runtimeFactory.externallyManaged()) {
+            Assignment assignment = assignments.iterator().next();
             final String assignedWorkerId = assignment.getWorkerId();
             final String workerId = this.workerConfig.getWorkerId();
             String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
@@ -361,14 +377,43 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
                     }
-                    continue;
+                    return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                            .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build();
                 }
                 if (restart) {
-                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
-                            assignment.getInstance().getInstanceId());
+                    this.functionAdmin.functions().restartFunction(tenant, namespace, functionName);
                 } else {
-                    this.functionAdmin.functions().stopFunction(tenant, namespace, functionName,
-                            assignment.getInstance().getInstanceId());
+                    this.functionAdmin.functions().stopFunction(tenant, namespace, functionName);
+                }
+            }
+        } else {
+            for (Assignment assignment : assignments) {
+                final String assignedWorkerId = assignment.getWorkerId();
+                final String workerId = this.workerConfig.getWorkerId();
+                String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
+                if (assignedWorkerId.equals(workerId)) {
+                    stopFunction(fullyQualifiedInstanceId, restart);
+                } else {
+                    List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
+                    WorkerInfo workerInfo = null;
+                    for (WorkerInfo entry : workerInfoList) {
+                        if (assignment.getWorkerId().equals(entry.getWorkerId())) {
+                            workerInfo = entry;
+                        }
+                    }
+                    if (workerInfo == null) {
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId);
+                        }
+                        continue;
+                    }
+                    if (restart) {
+                        this.functionAdmin.functions().restartFunction(tenant, namespace, functionName,
+                                assignment.getInstance().getInstanceId());
+                    } else {
+                        this.functionAdmin.functions().stopFunction(tenant, namespace, functionName,
+                                assignment.getInstance().getInstanceId());
+                    }
                 }
             }
         }
@@ -379,7 +424,11 @@ public Response stopFunctionInstances(String tenant, String namespace, String fu
      * It stops all functions instances owned by current worker
      * @throws Exception
      */
-    public void stopAllOwnedFunctions() throws Exception {
+    public void stopAllOwnedFunctions() {
+        if (runtimeFactory.externallyManaged()) {
+            log.warn("Will not stop any functions since they are externally managed");
+            return;
+        }
         final String workerId = this.workerConfig.getWorkerId();
         Map<String, Assignment> assignments = workerIdToAssignments.get(workerId);
         if (assignments != null) {
@@ -401,7 +450,7 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
             this.functionActioner.stopFunction(functionRuntimeInfo);
             try {
                 if(restart) {
-                    this.functionActioner.startFunction(functionRuntimeInfo);    
+                    this.functionActioner.startFunction(functionRuntimeInfo);
                 }
             } catch (Exception ex) {
                 log.info("{} Error re-starting function", fullyQualifiedInstanceId, ex);
@@ -429,24 +478,38 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro
             return functionStatusListBuilder.build();
         }
 
-        for (Assignment assignment : assignments) {
+        if (runtimeFactory.externallyManaged()) {
+            Assignment assignment = assignments.iterator().next();
             boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
-            InstanceCommunication.FunctionStatus functionStatus = isOwner
-                    ? (getFunctionInstanceStatus(tenant, namespace, functionName,
-                            assignment.getInstance().getInstanceId(), null))
-                    : this.functionAdmin.functions().getFunctionStatus(
-                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
-                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
-                            assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
-                            assignment.getInstance().getInstanceId());
-            functionStatusListBuilder.addFunctionStatusList(functionStatus);
+            if (isOwner) {
+                int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
+                for (int i = 0; i < parallelism; ++i) {
+                    InstanceCommunication.FunctionStatus functionStatus = getFunctionInstanceStatus(tenant, namespace,
+                            functionName, i, null);
+                    functionStatusListBuilder.addFunctionStatusList(functionStatus);
+                }
+            } else {
+                return this.functionAdmin.functions().getFunctionStatus(tenant, namespace, functionName);
+            }
+        } else {
+            for (Assignment assignment : assignments) {
+                boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
+                InstanceCommunication.FunctionStatus functionStatus = isOwner
+                        ? (getFunctionInstanceStatus(tenant, namespace, functionName,
+                        assignment.getInstance().getInstanceId(), null))
+                        : this.functionAdmin.functions().getFunctionStatus(
+                        assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
+                        assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
+                        assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
+                        assignment.getInstance().getInstanceId());
+                functionStatusListBuilder.addFunctionStatusList(functionStatus);
+            }
         }
         return functionStatusListBuilder.build();
     }
 
     /**
      * Process an assignment update from the assignment topic
-     * @param messageId the message id of the update assignment
      * @param newAssignment the assignment
      */
     public synchronized void processAssignment(Assignment newAssignment) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
index b18fd12881..03b9aead87 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java
@@ -221,7 +221,7 @@ public void checkFailures(FunctionMetaDataManager functionMetaDataManager,
                     .map(assignment -> assignment.getInstance())
                     .collect(Collectors.toSet());
 
-            Set<Function.Instance> instances = new HashSet<>(SchedulerManager.computeInstances(functionMetaData));
+            Set<Function.Instance> instances = new HashSet<>(SchedulerManager.computeInstances(functionMetaData, functionRuntimeManager.getRuntimeFactory().externallyManaged()));
 
             for (Function.Instance instance : instances) {
                 if (!assignedInstances.contains(instance)) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index db7785a55b..608b2fcd4a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -126,7 +126,7 @@ public void invokeScheduler() {
                 .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList());
 
         List<FunctionMetaData> allFunctions = this.functionMetaDataManager.getAllFunctionMetaData();
-        Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions);
+        Map<String, Function.Instance> allInstances = computeAllInstances(allFunctions, functionRuntimeManager.getRuntimeFactory().externallyManaged());
         Map<String, Map<String, Assignment>> workerIdToAssignments = this.functionRuntimeManager
                 .getCurrentAssignments();
         //delete assignments of functions and instances that don't exist anymore
@@ -207,23 +207,32 @@ private void publishNewAssignment(Assignment assignment, boolean deleted) {
         }
     }
 
-    public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions) {
+    public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions,
+                                                                     boolean externallyManagedRuntime) {
         Map<String, Function.Instance> functionInstances = new HashMap<>();
         for (FunctionMetaData functionMetaData : allFunctions) {
-            for (Function.Instance instance : computeInstances(functionMetaData)) {
+            for (Function.Instance instance : computeInstances(functionMetaData, externallyManagedRuntime)) {
                 functionInstances.put(Utils.getFullyQualifiedInstanceId(instance), instance);
             }
         }
         return functionInstances;
     }
 
-    public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData) {
+    public static List<Function.Instance> computeInstances(FunctionMetaData functionMetaData,
+                                                           boolean externallyManagedRuntime) {
         List<Function.Instance> functionInstances = new LinkedList<>();
-        int instances = functionMetaData.getFunctionDetails().getParallelism();
-        for (int i = 0; i < instances; i++) {
+        if (!externallyManagedRuntime) {
+            int instances = functionMetaData.getFunctionDetails().getParallelism();
+            for (int i = 0; i < instances; i++) {
+                functionInstances.add(Function.Instance.newBuilder()
+                        .setFunctionMetaData(functionMetaData)
+                        .setInstanceId(i)
+                        .build());
+            }
+        } else {
             functionInstances.add(Function.Instance.newBuilder()
                     .setFunctionMetaData(functionMetaData)
-                    .setInstanceId(i)
+                    .setInstanceId(-1)
                     .build());
         }
         return functionInstances;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index a2524c6d97..81e9fd22ca 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -125,6 +125,22 @@
     }
     private ProcessContainerFactory processContainerFactory;
 
+    @Data
+    @Setter
+    @Getter
+    @EqualsAndHashCode
+    @ToString
+    public static class KubernetesContainerFactory {
+        private String k8Uri;
+        private String jobNamespace;
+        private String pulsarDockerImageName;
+        private String pulsarRootDir;
+        private Boolean submittingInsidePod;
+        private String pulsarServiceUrl;
+        private String pulsarAdminUrl;
+    }
+    private KubernetesContainerFactory kubernetesContainerFactory;
+
     public String getFunctionMetadataTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index aca3b4a9b7..b92ae27474 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -369,6 +369,13 @@ public Response getFunctionInstanceStatus(final String tenant, final String name
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
         }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+        int instanceIdInt = Integer.parseInt(instanceId);
+        if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
+            log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", tenant, namespace, functionName);
+            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
+        }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         FunctionStatus functionStatus = null;
@@ -751,7 +758,7 @@ public Response uploadFunction(final InputStream uploadedInputStream, final Stri
         try {
             log.info("Uploading function package to {}", path);
 
-            Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, Codec.encode(path));
+            Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
         } catch (IOException e) {
             log.error("Error uploading file {}", path, e);
             return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage()))
@@ -778,7 +785,7 @@ public void write(final OutputStream output) throws IOException {
                         throw new IllegalArgumentException("invalid file url path: " + path);
                     }
                 } else {
-                    Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, Codec.encode(path));
+                    Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
                 }
             }
         }).build();
@@ -799,7 +806,6 @@ private void validateGetFunctionInstanceRequestParams(String tenant, String name
         validateGetFunctionRequestParams(tenant, namespace, functionName);
         if (instanceId == null) {
             throw new IllegalArgumentException("Function Instance Id is not provided");
-
         }
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 05e3422d39..bf4b4aafa5 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -103,7 +103,7 @@ public void testStartFunctionWithPkgUrl() throws Exception {
 
         RuntimeFactory factory = mock(RuntimeFactory.class);
         Runtime runtime = mock(Runtime.class);
-        doReturn(runtime).when(factory).createContainer(any(), any(), any());
+        doReturn(runtime).when(factory).createContainer(any(), any(), any(), any());
         doNothing().when(runtime).start();
         Namespace dlogNamespace = mock(Namespace.class);
         final String exceptionMsg = "dl namespace not-found";
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 00e4b6e0e1..649cd3ef73 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -54,6 +54,7 @@
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.proto.Request;
+import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
@@ -153,6 +154,9 @@ public void testSchedule() throws Exception {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -240,6 +244,9 @@ public void testAddingFunctions() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -297,6 +304,9 @@ public void testDeletingFunctions() throws Exception {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -359,6 +369,9 @@ public void testScalingUp() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -464,6 +477,9 @@ public void testScalingDown() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")
@@ -586,6 +602,9 @@ public void testHeartbeatFunction() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
         Map<String, Function.Assignment> assignmentEntry1 = new HashMap<>();
 
@@ -637,6 +656,9 @@ public void testUpdate() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
+
         // set assignments
         Function.Assignment assignment1 = Function.Assignment.newBuilder()
                 .setWorkerId("worker-1")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services