You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/03/19 17:42:16 UTC

[pulsar] branch master updated: Implementing authentication for Pulsar Functions (#3735)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b50760c  Implementing authentication for Pulsar Functions (#3735)
b50760c is described below

commit b50760ca42d97ca16e7fd33329e5f74e0775d926
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue Mar 19 11:42:11 2019 -0600

    Implementing authentication for Pulsar Functions (#3735)
    
    * Implementing authentication for Pulsar Functions
    
    * delete unnecessary changes
    
    * cleaning up
    
    * improving implementation
    
    * fixing tests
    
    * cleaning up
    
    * add no op implementation
    
    * cleaning up unnecessary changes
    
    * refactoring based on comments
    
    * adding comments
    
    * change data from string type to bytes
    
    * add proto file
    
    * addressing comments
    
    * up merging
    
    * refactoring get token code
    
    * cleaning up
    
    * fix bugs and add tests
    
    * add tests
    
    * remove service account creation
    
    * cleanup unused imports
    
    * add field for auth provider
    
    * adding comments
---
 .../AuthenticationProviderToken.java               |   4 +-
 .../pulsar/broker/admin/impl/FunctionsBase.java    |   2 +-
 .../apache/pulsar/broker/admin/impl/SinkBase.java  |   2 +-
 .../pulsar/broker/admin/impl/SourceBase.java       |   2 +-
 .../pulsar/functions/instance/InstanceConfig.java  |   2 +
 .../proto/src/main/proto/Function.proto            |  15 ++
 pulsar-functions/runtime/pom.xml                   |   6 +
 .../auth/ClearTextFunctionTokenAuthProvider.java   |  55 +++++
 .../pulsar/functions/auth/FunctionAuthData.java}   |  43 ++--
 .../functions/auth/FunctionAuthProvider.java       |  59 +++++
 .../pulsar/functions/auth/FunctionAuthUtils.java   |  28 +++
 .../KubernetesFunctionAuthProvider.java}           |  40 +---
 .../auth/KubernetesSecretsTokenAuthProvider.java   | 248 +++++++++++++++++++++
 .../functions/auth/NoOpFunctionAuthProvider.java   |  43 ++++
 .../functions/runtime/KubernetesRuntime.java       |  83 +++++--
 .../runtime/KubernetesRuntimeFactory.java          |  35 ++-
 .../functions/runtime/ProcessRuntimeFactory.java   |   8 +
 .../apache/pulsar/functions/runtime/Runtime.java   |   4 +
 .../pulsar/functions/runtime/RuntimeFactory.java   |   7 +
 .../ClearTextFunctionTokenAuthProviderTest.java    |  63 ++++++
 .../KubernetesSecretsTokenAuthProviderTest.java    | 107 +++++++++
 .../runtime/KubernetesRuntimeFactoryTest.java      |   2 -
 .../pulsar/functions/worker/FunctionActioner.java  |  60 +++--
 .../functions/worker/rest/FunctionApiResource.java |   5 +
 .../functions/worker/rest/api/ComponentImpl.java   |  40 +++-
 .../functions/worker/rest/api/FunctionsImplV2.java |   2 +-
 .../worker/rest/api/v3/FunctionApiV3Resource.java  |   2 +-
 .../worker/rest/api/v3/SinkApiV3Resource.java      |   4 +-
 .../worker/rest/api/v3/SourceApiV3Resource.java    |   2 +-
 .../worker/FunctionRuntimeManagerTest.java         |  15 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |   8 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |   6 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |   6 +-
 33 files changed, 872 insertions(+), 136 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index 2af43f2..f5ee57c 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -76,7 +76,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
         return parseToken(token);
     }
 
-    private String getToken(AuthenticationDataSource authData) throws AuthenticationException {
+    public static String getToken(AuthenticationDataSource authData) throws AuthenticationException {
         if (authData.hasDataFromCommand()) {
             // Authenticate Pulsar binary connection
             return authData.getCommandData();
@@ -96,7 +96,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
         }
     }
 
-    private String validateToken(final String token) throws AuthenticationException {
+    private static String validateToken(final String token) throws AuthenticationException {
         if (StringUtils.isNotBlank(token)) {
             return token;
         } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 9b88f29..c4fd33f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -82,7 +82,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                  final @FormDataParam("functionConfig") String functionConfigJson) {
 
         functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-            functionPkgUrl, null, functionConfigJson, clientAppId());
+            functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 2bd22a4..ffd4dd7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -79,7 +79,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                              final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
     }
 
     @PUT
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 0e8348f..6078ede 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -79,7 +79,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                                final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-            functionPkgUrl, null, sourceConfigJson, clientAppId());
+            functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
     }
 
     @PUT
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 023f6a2..f823728 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -23,6 +23,7 @@ import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
 /**
@@ -40,6 +41,7 @@ public class InstanceConfig {
     private String functionVersion;
     private FunctionDetails functionDetails;
     private int maxBufferedTuples;
+    private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
     private int port;
     private String clusterName;
 
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto b/pulsar-functions/proto/src/main/proto/Function.proto
index 8c93b3a..fba37e4 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -141,6 +141,21 @@ message FunctionMetaData {
     uint64 version = 3;
     uint64 createTime = 4;
     map<int32, FunctionState> instanceStates = 5;
+    FunctionAuthenticationSpec functionAuthSpec = 6;
+}
+
+message FunctionAuthenticationSpec {
+    /**
+     * function authentication related data that the function authentication provider
+     * needs to cache/distribute to all workers support function authentication.
+     * Depending on the function authentication provider implementation, this can be the actual auth credentials
+     * or a pointer to the auth credentials that this function should use
+     */
+    bytes data = 1;
+    /**
+     * classname of the function auth provicer this data is relevant to
+     */
+    string provider = 2;
 }
 
 message Instance {
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index f3176a7..ca363c8 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -63,6 +63,12 @@
       </exclusions>
     </dependency>
 
+      <dependency>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-broker-common</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+
   </dependencies>
 
 </project>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
new file mode 100644
index 0000000..7adf5ed
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProvider.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+
+import java.util.Optional;
+
+import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
+
+public class ClearTextFunctionTokenAuthProvider implements FunctionAuthProvider {
+    @Override
+    public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
+        authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        authConfig.setClientAuthenticationParameters("token:" + new String(functionAuthData.getData()));
+    }
+
+    @Override
+    public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception {
+        String token = null;
+        try {
+            token = getToken(authenticationDataSource);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        if (token != null) {
+            return Optional.of(FunctionAuthData.builder().data(token.getBytes()).build());
+        }
+        return null;
+    }
+
+    @Override
+    public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
+        //no-op
+    }
+}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthData.java
similarity index 50%
copy from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
copy to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthData.java
index 023f6a2..be17668 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthData.java
@@ -16,39 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.instance;
+package org.apache.pulsar.functions.auth;
 
+import lombok.Builder;
 import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
+@Data
+@Builder
 /**
- * This is the config passed to the Java Instance. Contains all the information
- * passed to run functions.
+ * A wrapper for authentication data for functions
  */
-@Data
-@Getter
-@Setter
-@EqualsAndHashCode
-@ToString
-public class InstanceConfig {
-    private int instanceId;
-    private String functionId;
-    private String functionVersion;
-    private FunctionDetails functionDetails;
-    private int maxBufferedTuples;
-    private int port;
-    private String clusterName;
-
+public class FunctionAuthData {
+    /**
+     * function authentication related data that the function authentication provider
+     * needs to cache/distribute to all workers support function authentication.
+     * Depending on the function authentication provider implementation, this can be the actual auth credentials
+     * or a pointer to the auth credentials that this function should use
+     */
+    private byte[] data;
     /**
-     * Get the string representation of {@link #getInstanceId()}.
-     *
-     * @return the string representation of {@link #getInstanceId()}.
+     * classname of the function auth provicer this data is relevant to
      */
-    public String getInstanceName() {
-        return "" + instanceId;
-    }
+    private String provider;
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
new file mode 100644
index 0000000..e30e9ad
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+
+import java.util.Optional;
+
+/**
+ * This is a generic interface that functions can use to cache and distribute appropriate authentication
+ * data that is needed to configure the runtime of functions to support appropriate authentication of function instances
+ */
+public interface FunctionAuthProvider {
+
+    /**
+     * Set authentication configs for function instance based on the data in FunctionAuthenticationSpec
+     * @param authConfig authentication configs passed to the function instance
+     * @param functionAuthData function authentication data that is provider specific
+     */
+    void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData);
+
+    /**
+     * Cache auth data in as part of function metadata for function that runtime may need to configure authentication
+     * @param tenant tenant that the function is running under
+     * @param namespace namespace that is the function is running under
+     * @param name name of the function
+     * @param authenticationDataSource auth data
+     * @return
+     * @throws Exception
+     */
+    Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name, AuthenticationDataSource authenticationDataSource) throws Exception;
+
+    /**
+     * Clean up operation for auth when function is terminated
+     * @param tenant tenant that the function is running under
+     * @param namespace namespace that is the function is running under
+     * @param name name of the function
+     * @param functionAuthData function auth data
+     * @throws Exception
+     */
+    void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception;
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthUtils.java
new file mode 100644
index 0000000..3356706
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthUtils.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.functions.proto.Function;
+
+public final class FunctionAuthUtils {
+
+    public static final FunctionAuthData getFunctionAuthData(Function.FunctionAuthenticationSpec functionAuthenticationSpec) {
+        return FunctionAuthData.builder().data(functionAuthenticationSpec.getData().toByteArray()).build();
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
similarity index 50%
copy from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
copy to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index 19a5fc9..8b42c64 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -16,36 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.functions.auth;
 
-package org.apache.pulsar.functions.runtime;
-
-import org.apache.pulsar.functions.proto.InstanceCommunication;
-
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
+import io.kubernetes.client.models.V1ServiceAccount;
+import io.kubernetes.client.models.V1StatefulSet;
 
 /**
- * A function container is an environment for invoking functions.
+ * Kubernetes runtime specific functions authentication provider
  */
-public interface Runtime {
-
-    void start() throws Exception;
-
-    void join() throws Exception;
-
-    void stop() throws Exception;
-
-    boolean isAlive();
-
-    Throwable getDeathException();
-
-    CompletableFuture<InstanceCommunication.FunctionStatus> getFunctionStatus(int instanceId);
-
-    CompletableFuture<InstanceCommunication.MetricsData> getAndResetMetrics();
-    
-    CompletableFuture<Void> resetMetrics();
-    
-    CompletableFuture<InstanceCommunication.MetricsData> getMetrics(int instanceId);
-
-    String getPrometheusMetrics() throws IOException;
+public interface KubernetesFunctionAuthProvider extends FunctionAuthProvider {
+
+    /**
+     * Configure function statefulset spec based on function auth data
+     * @param statefulSet statefulset spec for function
+     * @param functionAuthData function auth data
+     */
+    void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData);
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
new file mode 100644
index 0000000..ff2cd57
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProvider.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1DeleteOptions;
+import io.kubernetes.client.models.V1ObjectMeta;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.models.V1Secret;
+import io.kubernetes.client.models.V1SecretVolumeSource;
+import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.models.V1Volume;
+import io.kubernetes.client.models.V1VolumeMount;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.apache.pulsar.functions.utils.Actions;
+import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static java.net.HttpURLConnection.HTTP_CONFLICT;
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.pulsar.broker.authentication.AuthenticationProviderToken.getToken;
+
+@Slf4j
+public class KubernetesSecretsTokenAuthProvider implements KubernetesFunctionAuthProvider {
+
+    private static final int NUM_RETRIES = 5;
+    private static final long SLEEP_BETWEEN_RETRIES_MS = 500;
+    private static final String SECRET_NAME = "function-auth";
+    private static final String DEFAULT_SECRET_MOUNT_DIR = "/etc/auth";
+    private static final String FUNCTION_AUTH_TOKEN = "token";
+
+
+    private final CoreV1Api coreClient;
+    private final String kubeNamespace;
+
+    public KubernetesSecretsTokenAuthProvider(CoreV1Api coreClient, String kubeNamespace) {
+        this.coreClient = coreClient;
+        this.kubeNamespace = kubeNamespace;
+    }
+
+    @Override
+    public void configureAuthDataStatefulSet(V1StatefulSet statefulSet, FunctionAuthData functionAuthData) {
+
+        V1PodSpec podSpec = statefulSet.getSpec().getTemplate().getSpec();
+
+        // configure pod mount secret with auth token
+        podSpec.setVolumes(Collections.singletonList(
+                new V1Volume()
+                        .name(SECRET_NAME)
+                        .secret(
+                                new V1SecretVolumeSource()
+                                        .secretName(getSecretName(new String(functionAuthData.getData())))
+                                        .defaultMode(256))));
+
+        podSpec.getContainers().forEach(container -> container.setVolumeMounts(Collections.singletonList(
+                new V1VolumeMount()
+                        .name(SECRET_NAME)
+                        .mountPath(DEFAULT_SECRET_MOUNT_DIR)
+                        .readOnly(true))));
+
+    }
+
+    @Override
+    public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
+        authConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        authConfig.setClientAuthenticationParameters(String.format("file://%s/%s", DEFAULT_SECRET_MOUNT_DIR, FUNCTION_AUTH_TOKEN));
+    }
+
+
+    @Override
+    public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name,
+                                                    AuthenticationDataSource authenticationDataSource) {
+        String id = null;
+        try {
+            String token = getToken(authenticationDataSource);
+            if (token != null) {
+                id = createSecret(token, tenant, namespace, name);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        if (id != null) {
+            return Optional.of(FunctionAuthData.builder().data(id.getBytes()).build());
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
+        String fqfn = FunctionDetailsUtils.getFullyQualifiedName(tenant, namespace, name);
+
+        String secretName = new String(functionAuthData.getData());
+        Actions.Action deleteSecrets = Actions.Action.builder()
+                .actionName(String.format("Deleting secrets for function %s", fqfn))
+                .numRetries(NUM_RETRIES)
+                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .supplier(() -> {
+                    try {
+                        V1DeleteOptions v1DeleteOptions = new V1DeleteOptions();
+                        v1DeleteOptions.setGracePeriodSeconds(0L);
+                        v1DeleteOptions.setPropagationPolicy("Foreground");
+
+                        coreClient.deleteNamespacedSecret(secretName,
+                                kubeNamespace, v1DeleteOptions, "true",
+                                null, null, null);
+                    } catch (ApiException e) {
+                        // if already deleted
+                        if (e.getCode() == HTTP_NOT_FOUND) {
+                            log.warn("Secrets for function {} does not exist", fqfn);
+                            return Actions.ActionResult.builder().success(true).build();
+                        }
+
+                        String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
+                        return Actions.ActionResult.builder()
+                                .success(false)
+                                .errorMsg(errorMsg)
+                                .build();
+                    }
+                    return Actions.ActionResult.builder().success(true).build();
+                })
+                .build();
+
+        Actions.Action waitForSecretsDeletion = Actions.Action.builder()
+                .actionName(String.format("Waiting for secrets for function %s to complete deletion", fqfn))
+                .numRetries(NUM_RETRIES)
+                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .supplier(() -> {
+                    try {
+                        coreClient.readNamespacedSecret(secretName, kubeNamespace,
+                                null, null, null);
+
+                    } catch (ApiException e) {
+                        // statefulset is gone
+                        if (e.getCode() == HTTP_NOT_FOUND) {
+                            return Actions.ActionResult.builder().success(true).build();
+                        }
+                        String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
+                        return Actions.ActionResult.builder()
+                                .success(false)
+                                .errorMsg(errorMsg)
+                                .build();
+                    }
+                    return Actions.ActionResult.builder()
+                            .success(false)
+                            .build();
+                })
+                .build();
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        Actions.newBuilder()
+                .addAction(deleteSecrets.toBuilder()
+                        .continueOn(true)
+                        .build())
+                .addAction(waitForSecretsDeletion.toBuilder()
+                        .continueOn(false)
+                        .onSuccess(ignore -> success.set(true))
+                        .build())
+                .addAction(deleteSecrets.toBuilder()
+                        .continueOn(true)
+                        .build())
+                .addAction(waitForSecretsDeletion.toBuilder()
+                        .onSuccess(ignore -> success.set(true))
+                        .build())
+                .run();
+
+        if (!success.get()) {
+            throw new RuntimeException(String.format("Failed to delete secrets for function %s", fqfn));
+        }
+    }
+
+    private String createSecret(String token, String tenant, String namespace, String name) throws ApiException, InterruptedException {
+
+        StringBuilder sb = new StringBuilder();
+        Actions.Action createAuthSecret = Actions.Action.builder()
+                .actionName(String.format("Creating authentication secret for function %s/%s/%s", tenant, namespace, name))
+                .numRetries(NUM_RETRIES)
+                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .supplier(() -> {
+                    String id =  RandomStringUtils.random(5, true, true).toLowerCase();
+                    V1Secret v1Secret = new V1Secret()
+                            .metadata(new V1ObjectMeta().name(getSecretName(id)))
+                            .data(Collections.singletonMap(FUNCTION_AUTH_TOKEN, token.getBytes()));
+                    try {
+                        coreClient.createNamespacedSecret(kubeNamespace, v1Secret, "true");
+                    } catch (ApiException e) {
+                        // already exists
+                        if (e.getCode() == HTTP_CONFLICT) {
+                            return Actions.ActionResult.builder()
+                                    .errorMsg(String.format("Secret %s already present", id))
+                                    .success(false)
+                                    .build();
+                        }
+
+                        String errorMsg = e.getResponseBody() != null ? e.getResponseBody() : e.getMessage();
+                        return Actions.ActionResult.builder()
+                                .success(false)
+                                .errorMsg(errorMsg)
+                                .build();
+                    }
+
+                    sb.append(id.toCharArray());
+                    return Actions.ActionResult.builder().success(true).build();
+                })
+                .build();
+
+        AtomicBoolean success = new AtomicBoolean(false);
+        Actions.newBuilder()
+                .addAction(createAuthSecret.toBuilder()
+                        .onSuccess(ignore -> success.set(true))
+                        .build())
+                .run();
+
+        if (!success.get()) {
+            throw new RuntimeException(String.format("Failed to create authentication secret for function %s/%s/%s", tenant, namespace, name));
+        }
+
+        return sb.toString();
+    }
+
+    private String getSecretName(String id) {
+        return "pf-secret-" + id;
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/NoOpFunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/NoOpFunctionAuthProvider.java
new file mode 100644
index 0000000..5b4a0ab
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/NoOpFunctionAuthProvider.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+
+import java.util.Optional;
+
+public class NoOpFunctionAuthProvider implements FunctionAuthProvider{
+    @Override
+    public void configureAuthenticationConfig(AuthenticationConfig authConfig, FunctionAuthData functionAuthData) {
+
+    }
+
+    @Override
+    public Optional<FunctionAuthData> cacheAuthData(String tenant, String namespace, String name,
+                                                    AuthenticationDataSource authenticationDataSource)
+            throws Exception {
+        return Optional.empty();
+    }
+
+    @Override
+    public void cleanUpAuthData(String tenant, String namespace, String name, FunctionAuthData functionAuthData) throws Exception {
+
+    }
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 140de1c..fb51b36 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -52,6 +52,7 @@ import io.kubernetes.client.models.V1StatefulSetSpec;
 import io.kubernetes.client.models.V1Toleration;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.instance.InstanceUtils;
@@ -80,6 +81,8 @@ import java.util.regex.Pattern;
 
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 
 /**
  * Kubernetes based runtime for running functions.
@@ -94,9 +97,6 @@ import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 @VisibleForTesting
 public class KubernetesRuntime implements Runtime {
 
-    private static int NUM_RETRIES = 5;
-    private static long SLEEP_BETWEEN_RETRIES_MS = 500;
-
     private static final String ENV_SHARD_ID = "SHARD_ID";
     private static final int maxJobNameSize = 55;
     private static final Integer GRPC_PORT = 9093;
@@ -134,7 +134,8 @@ public class KubernetesRuntime implements Runtime {
     private final String pulsarAdminUrl;
     private final SecretsProviderConfigurator secretsProviderConfigurator;
     private int percentMemoryPadding;
-
+    private final KubernetesFunctionAuthProvider functionAuthDataCacheProvider;
+    private final AuthenticationConfig authConfig;
 
     KubernetesRuntime(AppsV1Api appsClient,
                       CoreV1Api coreClient,
@@ -158,7 +159,8 @@ public class KubernetesRuntime implements Runtime {
                       AuthenticationConfig authConfig,
                       SecretsProviderConfigurator secretsProviderConfigurator,
                       Integer expectedMetricsCollectionInterval,
-                      int percentMemoryPadding) throws Exception {
+                      int percentMemoryPadding,
+                      KubernetesFunctionAuthProvider functionAuthDataCacheProvider) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
         this.instanceConfig = instanceConfig;
@@ -187,11 +189,16 @@ public class KubernetesRuntime implements Runtime {
                 break;
         }
 
+        this.authConfig = authConfig;
+
+        this.functionAuthDataCacheProvider = functionAuthDataCacheProvider;
+
         this.processArgs = new LinkedList<>();
         this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
         // use exec to to launch function so that it gets launched in the foreground with the same PID as shell
         // so that when we kill the pod, the signal will get propagated to the function code
         this.processArgs.add("exec");
+
         this.processArgs.addAll(
                 RuntimeUtils.getCmd(
                         instanceConfig,
@@ -221,16 +228,20 @@ public class KubernetesRuntime implements Runtime {
      */
     @Override
     public void start() throws Exception {
-        submitService();
+
         try {
+            submitService();
             submitStatefulSet();
+
         } catch (Exception e) {
-            log.error("Could not submit statefulset for {}/{}/{}, deleting service as well",
+            log.error("Failed start function {}/{}/{} in Kubernetes",
                     instanceConfig.getFunctionDetails().getTenant(),
                     instanceConfig.getFunctionDetails().getNamespace(),
                     instanceConfig.getFunctionDetails().getName(), e);
-            deleteService();
+            stop();
+            throw e;
         }
+
         if (channel == null && stub == null) {
             channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
             stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
@@ -363,8 +374,8 @@ public class KubernetesRuntime implements Runtime {
 
         Actions.Action createService = Actions.Action.builder()
                 .actionName(String.format("Submitting service for function %s", fqfn))
-                .numRetries(NUM_RETRIES)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
                 .supplier(() -> {
                     final V1Service response;
                     try {
@@ -428,6 +439,11 @@ public class KubernetesRuntime implements Runtime {
 
     private void submitStatefulSet() throws Exception {
         final V1StatefulSet statefulSet = createStatefulSet();
+        // Configure function authentication if needed
+        if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+            functionAuthDataCacheProvider.configureAuthDataStatefulSet(
+                    statefulSet, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
+        }
 
         log.info("Submitting the following spec to k8 {}", appsClient.getApiClient().getJSON().serialize(statefulSet));
 
@@ -435,8 +451,8 @@ public class KubernetesRuntime implements Runtime {
 
         Actions.Action createStatefulSet = Actions.Action.builder()
                 .actionName(String.format("Submitting statefulset for function %s", fqfn))
-                .numRetries(NUM_RETRIES)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
                 .supplier(() -> {
                     final V1StatefulSet response;
                     try {
@@ -482,8 +498,8 @@ public class KubernetesRuntime implements Runtime {
         String fqfn = FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails());
         Actions.Action deleteStatefulSet = Actions.Action.builder()
                 .actionName(String.format("Deleting statefulset for function %s", fqfn))
-                .numRetries(NUM_RETRIES)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
                 .supplier(() -> {
                     Response response;
                     try {
@@ -531,8 +547,8 @@ public class KubernetesRuntime implements Runtime {
         Actions.Action waitForStatefulSetDeletion = Actions.Action.builder()
                 .actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
                 // set retry period to be about 2x the graceshutdown time
-                .numRetries(NUM_RETRIES * 2)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS* 2)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS* 2)
                 .supplier(() -> {
                     V1StatefulSet response;
                     try {
@@ -560,8 +576,8 @@ public class KubernetesRuntime implements Runtime {
         // Need to wait for all pods to die so we can cleanup subscriptions.
         Actions.Action waitForStatefulPodsToTerminate = Actions.Action.builder()
                 .actionName(String.format("Waiting for pods for function %s to terminate", fqfn))
-                .numRetries(NUM_RETRIES * 2)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS * 2)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES * 2)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS * 2)
                 .supplier(() -> {
                     String labels = String.format("tenant=%s,namespace=%s,name=%s",
                             instanceConfig.getFunctionDetails().getTenant(),
@@ -633,8 +649,8 @@ public class KubernetesRuntime implements Runtime {
 
         Actions.Action deleteService = Actions.Action.builder()
                 .actionName(String.format("Deleting service for function %s", fqfn))
-                .numRetries(NUM_RETRIES)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
                 .supplier(() -> {
                     final Response response;
                     try {
@@ -679,8 +695,8 @@ public class KubernetesRuntime implements Runtime {
 
         Actions.Action waitForServiceDeletion = Actions.Action.builder()
                 .actionName(String.format("Waiting for statefulset for function %s to complete deletion", fqfn))
-                .numRetries(NUM_RETRIES)
-                .sleepBetweenInvocationsMs(SLEEP_BETWEEN_RETRIES_MS)
+                .numRetries(KubernetesRuntimeFactory.NUM_RETRIES)
+                .sleepBetweenInvocationsMs(KubernetesRuntimeFactory.SLEEP_BETWEEN_RETRIES_MS)
                 .supplier(() -> {
                     V1Service response;
                     try {
@@ -738,6 +754,28 @@ public class KubernetesRuntime implements Runtime {
     }
 
     private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) {
+
+        // add auth plugin and parameters if necessary
+        if (authConfig != null) {
+            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+                    && isNotBlank(authConfig.getClientAuthenticationParameters())) {
+                return Arrays.asList(
+                        pulsarRootDir + "/bin/pulsar-admin",
+                        "--auth-plugin",
+                        authConfig.getClientAuthenticationPlugin(),
+                        "--auth-params",
+                        authConfig.getClientAuthenticationParameters(),
+                        "--admin-url",
+                        pulsarAdminUrl,
+                        "functions",
+                        "download",
+                        "--path",
+                        bkPath,
+                        "--destination-file",
+                        userCodeFilePath);
+            }
+        }
+
         return Arrays.asList(
                 pulsarRootDir + "/bin/pulsar-admin",
                 "--admin-url",
@@ -795,6 +833,7 @@ public class KubernetesRuntime implements Runtime {
 
         statefulSet.spec(statefulSetSpec);
 
+
         return statefulSet;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 6a551c2..7fa67ce 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -27,12 +27,15 @@ import io.kubernetes.client.apis.CoreV1Api;
 import io.kubernetes.client.models.V1ConfigMap;
 import io.kubernetes.client.util.Config;
 import java.nio.file.Paths;
+
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
+import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
@@ -44,6 +47,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 
 /**
  * Kubernetes based function container factory implementation.
@@ -51,6 +55,9 @@ import static org.apache.commons.lang3.StringUtils.isEmpty;
 @Slf4j
 public class KubernetesRuntimeFactory implements RuntimeFactory {
 
+    static int NUM_RETRIES = 5;
+    static long SLEEP_BETWEEN_RETRIES_MS = 500;
+
     @Getter
     @Setter
     @NoArgsConstructor
@@ -158,6 +165,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.functionInstanceMinResources = functionInstanceMinResources;
+        try {
+            setupClient();
+        } catch (Exception e) {
+            log.error("Failed to setup client", e);
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
@@ -169,7 +182,6 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String codePkgUrl,
                                              String originalCodeFileName,
                                              Long expectedHealthCheckInterval) throws Exception {
-        setupClient();
         String instanceFile;
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
@@ -181,6 +193,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             default:
                 throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
         }
+
+        // adjust the auth config to support auth
+        if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+            getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
+        }
+
         return new KubernetesRuntime(
             appsClient,
             coreClient,
@@ -204,7 +222,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             authConfig,
             secretsProviderConfigurator,
             expectedMetricsCollectionInterval,
-            this.kubernetesInfo.getPercentMemoryPadding());
+            this.kubernetesInfo.getPercentMemoryPadding(),
+            getAuthProvider());
     }
 
     @Override
@@ -215,16 +234,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
         KubernetesRuntime.doChecks(functionDetails);
         validateMinResourcesRequired(functionDetails);
-        try {
-            setupClient();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
         secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(), functionDetails);
     }
 
     @VisibleForTesting
-    void setupClient() throws Exception {
+    public void setupClient() throws Exception {
         if (appsClient == null) {
             if (this.kubernetesInfo.getK8Uri() == null) {
                 log.info("k8Uri is null thus going by defaults");
@@ -309,4 +323,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             }
         }
     }
+
+    @Override
+    public KubernetesFunctionAuthProvider getAuthProvider() {
+        return new KubernetesSecretsTokenAuthProvider(coreClient, kubernetesInfo.jobNamespace);
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 2b85815..7090967 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -29,6 +29,8 @@ import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.nio.file.Paths;
 
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+
 /**
  * Thread based function container factory implementation.
  */
@@ -125,6 +127,12 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
             default:
                 throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
         }
+
+        // configure auth if necessary
+        if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+            getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
+        }
+
         return new ProcessRuntime(
             instanceConfig,
             instanceFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
index 19a5fc9..77f660f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/Runtime.java
@@ -35,6 +35,10 @@ public interface Runtime {
 
     void stop() throws Exception;
 
+    default void terminate() throws Exception {
+        stop();
+    }
+
     boolean isAlive();
 
     Throwable getDeathException();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 248afbb..4384f00 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -19,6 +19,8 @@
 
 package org.apache.pulsar.functions.runtime;
 
+import org.apache.pulsar.functions.auth.FunctionAuthProvider;
+import org.apache.pulsar.functions.auth.NoOpFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 
@@ -43,7 +45,12 @@ public interface RuntimeFactory extends AutoCloseable {
 
     default void doAdmissionChecks(Function.FunctionDetails functionDetails) { }
 
+    default FunctionAuthProvider getAuthProvider() throws IllegalAccessException, InstantiationException {
+        return NoOpFunctionAuthProvider.class.newInstance();
+    }
+
     @Override
     void close();
 
 }
+ 
\ No newline at end of file
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
new file mode 100644
index 0000000..3dde48b
--- /dev/null
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/ClearTextFunctionTokenAuthProviderTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Optional;
+
+public class ClearTextFunctionTokenAuthProviderTest {
+
+    @Test
+    public void testClearTextAuth() throws Exception {
+
+        ClearTextFunctionTokenAuthProvider clearTextFunctionTokenAuthProvider = new ClearTextFunctionTokenAuthProvider();
+
+        Optional<FunctionAuthData> functionAuthData = clearTextFunctionTokenAuthProvider.cacheAuthData("test-tenant",
+                "test-ns", "test-func", new AuthenticationDataSource() {
+                    @Override
+                    public boolean hasDataFromCommand() {
+                        return true;
+                    }
+
+                    @Override
+                    public String getCommandData() {
+                        return "test-token";
+                    }
+        });
+
+        Assert.assertTrue(functionAuthData.isPresent());
+        Assert.assertEquals(functionAuthData.get().getData(), "test-token".getBytes());
+
+        AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+        clearTextFunctionTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, functionAuthData.get());
+
+        Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
+        Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "token:test-token");
+
+
+        AuthenticationToken authenticationToken = new AuthenticationToken();
+        authenticationToken.configure(authenticationConfig.getClientAuthenticationParameters());
+        Assert.assertEquals(authenticationToken.getAuthData().getCommandData(), "test-token");
+    }
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
new file mode 100644
index 0000000..eac937f
--- /dev/null
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/auth/KubernetesSecretsTokenAuthProviderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.auth;
+
+import io.kubernetes.client.ApiException;
+import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1Container;
+import io.kubernetes.client.models.V1PodSpec;
+import io.kubernetes.client.models.V1PodTemplateSpec;
+import io.kubernetes.client.models.V1Secret;
+import io.kubernetes.client.models.V1ServiceAccount;
+import io.kubernetes.client.models.V1StatefulSet;
+import io.kubernetes.client.models.V1StatefulSetSpec;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class KubernetesSecretsTokenAuthProviderTest {
+
+    @Test
+    public void testConfigureAuthDataStatefulSet() {
+
+        CoreV1Api coreV1Api = mock(CoreV1Api.class);
+        KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
+
+
+        V1StatefulSet statefulSet = new V1StatefulSet();
+        statefulSet.setSpec(
+                new V1StatefulSetSpec().template(
+                        new V1PodTemplateSpec().spec(
+                                new V1PodSpec().containers(
+                                        Collections.singletonList(new V1Container())))));
+        FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
+        kubernetesSecretsTokenAuthProvider.configureAuthDataStatefulSet(statefulSet, functionAuthData);
+
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().size(), 1);
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getName(), "function-auth");
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getVolumes().get(0).getSecret().getSecretName(), "pf-secret-foo");
+
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().size(), 1);
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().size(), 1);
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getName(), "function-auth");
+        Assert.assertEquals(statefulSet.getSpec().getTemplate().getSpec().getContainers().get(0).getVolumeMounts().get(0).getMountPath(), "/etc/auth");
+    }
+
+    @Test
+    public void testCacheAuthData() throws ApiException {
+        CoreV1Api coreV1Api = mock(CoreV1Api.class);
+        doReturn(new V1Secret()).when(coreV1Api).createNamespacedSecret(anyString(), any(), anyString());
+        KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
+        Optional<FunctionAuthData> functionAuthData = kubernetesSecretsTokenAuthProvider.cacheAuthData("test-tenant",
+                "test-ns", "test-func", new AuthenticationDataSource() {
+                    @Override
+                    public boolean hasDataFromCommand() {
+                        return true;
+                    }
+
+                    @Override
+                    public String getCommandData() {
+                        return "test-token";
+                    }
+                });
+
+        Assert.assertTrue(functionAuthData.isPresent());
+        Assert.assertTrue(StringUtils.isNotBlank(new String(functionAuthData.get().getData())));
+    }
+
+    @Test
+    public void configureAuthenticationConfig() {
+        CoreV1Api coreV1Api = mock(CoreV1Api.class);
+        KubernetesSecretsTokenAuthProvider kubernetesSecretsTokenAuthProvider = new KubernetesSecretsTokenAuthProvider(coreV1Api, "default");
+        AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+        FunctionAuthData functionAuthData = FunctionAuthData.builder().data("foo".getBytes()).build();
+        kubernetesSecretsTokenAuthProvider.configureAuthenticationConfig(authenticationConfig, functionAuthData);
+
+        Assert.assertEquals(authenticationConfig.getClientAuthenticationPlugin(), AuthenticationToken.class.getName());
+        Assert.assertEquals(authenticationConfig.getClientAuthenticationParameters(), "file:///etc/auth/token");
+    }
+}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
index c97cdb6..8f1172e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
@@ -165,8 +165,6 @@ public class KubernetesRuntimeFactoryTest {
         factory = createKubernetesRuntimeFactory(null, null);
         FunctionDetails functionDetails = createFunctionDetails();
         factory.doAdmissionChecks(functionDetails);
-        verify(factory, times(1)).setupClient();
-
     }
 
     @Test
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index ae006cc..246834e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -68,6 +68,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.pulsar.common.functions.Utils.FILE;
 import static org.apache.pulsar.common.functions.Utils.HTTP;
 import static org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 import static org.apache.pulsar.functions.utils.Utils.getSinkType;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
@@ -154,6 +155,7 @@ public class FunctionActioner {
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
 
         InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(),
+                instance.getFunctionMetaData().getFunctionAuthSpec(),
                 instanceId, workerConfig.getPulsarFunctionsCluster());
 
         RuntimeSpawner runtimeSpawner = new RuntimeSpawner(instanceConfig, packageFile,
@@ -164,7 +166,8 @@ public class FunctionActioner {
     }
 
 
-    InstanceConfig createInstanceConfig(FunctionDetails functionDetails, int instanceId, String clusterName) {
+    InstanceConfig createInstanceConfig(FunctionDetails functionDetails, Function.FunctionAuthenticationSpec
+            functionAuthSpec, int instanceId, String clusterName) {
         InstanceConfig instanceConfig = new InstanceConfig();
         instanceConfig.setFunctionDetails(functionDetails);
         // TODO: set correct function id and version when features implemented
@@ -174,6 +177,7 @@ public class FunctionActioner {
         instanceConfig.setMaxBufferedTuples(1024);
         instanceConfig.setPort(org.apache.pulsar.functions.utils.Utils.findAvailablePort());
         instanceConfig.setClusterName(clusterName);
+        instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
         return instanceConfig;
     }
 
@@ -231,17 +235,9 @@ public class FunctionActioner {
         }
     }
 
-    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+    private void cleanupFunctionFiles(FunctionRuntimeInfo functionRuntimeInfo) {
         Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
         FunctionMetaData functionMetaData = instance.getFunctionMetaData();
-        FunctionDetails details = functionMetaData.getFunctionDetails();
-        log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
-                instance.getInstanceId());
-        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
-            functionRuntimeInfo.getRuntimeSpawner().close();
-            functionRuntimeInfo.setRuntimeSpawner(null);
-        }
-
         // clean up function package
         File pkgDir = new File(
                 workerConfig.getDownloadDirectory(),
@@ -250,7 +246,7 @@ public class FunctionActioner {
         if (pkgDir.exists()) {
             try {
                 MoreFiles.deleteRecursively(
-                    Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
+                        Paths.get(pkgDir.toURI()), RecursiveDeleteOption.ALLOW_INSECURE);
             } catch (IOException e) {
                 log.warn("Failed to delete package for function: {}",
                         FunctionDetailsUtils.getFullyQualifiedName(functionMetaData.getFunctionDetails()), e);
@@ -258,14 +254,42 @@ public class FunctionActioner {
         }
     }
 
+    public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
+        Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
+        FunctionMetaData functionMetaData = instance.getFunctionMetaData();
+        FunctionDetails details = functionMetaData.getFunctionDetails();
+        log.info("{}/{}/{}-{} Stopping function...", details.getTenant(), details.getNamespace(), details.getName(),
+                instance.getInstanceId());
+        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+            functionRuntimeInfo.getRuntimeSpawner().close();
+            functionRuntimeInfo.setRuntimeSpawner(null);
+        }
+
+        cleanupFunctionFiles(functionRuntimeInfo);
+    }
+
     public void terminateFunction(FunctionRuntimeInfo functionRuntimeInfo) {
-        FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData()
-                .getFunctionDetails();
-        log.info("{}/{}/{}-{} Terminating function...", details.getTenant(), details.getNamespace(), details.getName(),
-                functionRuntimeInfo.getFunctionInstance().getInstanceId());
+        FunctionDetails details = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionDetails();
         String fqfn = FunctionDetailsUtils.getFullyQualifiedName(details);
+        log.info("{}-{} Terminating function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
+
+        if (functionRuntimeInfo.getRuntimeSpawner() != null) {
+            functionRuntimeInfo.getRuntimeSpawner().close();
+            // cleanup any auth data cached
+            try {
+                functionRuntimeInfo.getRuntimeSpawner()
+                        .getRuntimeFactory().getAuthProvider()
+                        .cleanUpAuthData(
+                                details.getTenant(), details.getNamespace(), details.getName(),
+                                getFunctionAuthData(functionRuntimeInfo.getFunctionInstance().getFunctionMetaData().getFunctionAuthSpec()));
+            } catch (Exception e) {
+                log.error("Failed to cleanup auth data for function: {}", fqfn, e);
+            }
+            functionRuntimeInfo.setRuntimeSpawner(null);
+        }
+
+        cleanupFunctionFiles(functionRuntimeInfo);
 
-        stopFunction(functionRuntimeInfo);
         //cleanup subscriptions
         if (details.getSource().getCleanupSubscription()) {
             Map<String, Function.ConsumerSpec> consumerSpecMap = details.getSource().getInputSpecsMap();
@@ -309,8 +333,8 @@ public class FunctionActioner {
                                                                 SubscriptionStats sub = stats.subscriptions.get(InstanceUtils.getDefaultSubscriptionName(details));
                                                                 if (sub != null) {
                                                                     existingConsumers = sub.consumers.stream()
-                                                                    .map(consumerStats -> consumerStats.metadata)
-                                                                    .collect(Collectors.toList());
+                                                                            .map(consumerStats -> consumerStats.metadata)
+                                                                            .collect(Collectors.toList());
                                                                 }
                                                             } catch (PulsarAdminException e1) {
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
index 532d852..18ac5b2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/FunctionApiResource.java
@@ -24,6 +24,7 @@ import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.UriInfo;
 
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.broker.web.AuthenticationFilter;
 import org.apache.pulsar.functions.worker.WorkerService;
 
@@ -52,4 +53,8 @@ public class FunctionApiResource implements Supplier<WorkerService> {
                 ? (String) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedRoleAttributeName)
                 : null;
     }
+
+    public AuthenticationDataHttps clientAuthData() {
+        return (AuthenticationDataHttps) httpRequest.getAttribute(AuthenticationFilter.AuthenticatedDataAttributeName);
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 2bae728..23f479a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -26,6 +26,7 @@ import static org.apache.pulsar.functions.utils.Reflections.createInstance;
 
 import com.google.gson.Gson;
 
+import com.google.protobuf.ByteString;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
@@ -43,6 +44,7 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -50,7 +52,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import javax.ws.rs.WebApplicationException;
-import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
@@ -77,6 +78,7 @@ import static org.apache.pulsar.functions.utils.Utils.ComponentType.SINK;
 import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -93,6 +95,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
@@ -292,7 +295,8 @@ public abstract class ComponentImpl {
                                  final String functionPkgUrl,
                                  final String functionDetailsJson,
                                  final String componentConfigJson,
-                                 final String clientRole) {
+                                 final String clientRole,
+                                 AuthenticationDataHttps clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
@@ -377,8 +381,30 @@ public abstract class ComponentImpl {
 
         // function state
         FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
-                .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
+                .setFunctionDetails(functionDetails)
+                .setCreateTime(System.currentTimeMillis())
+                .setVersion(0);
 
+        // cache auth if need
+        if (clientAuthenticationDataHttps != null) {
+            try {
+                Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+                        .getRuntimeFactory()
+                        .getAuthProvider()
+                        .cacheAuthData(tenant, namespace, componentName, clientAuthenticationDataHttps);
+
+                if (functionAuthData.isPresent()) {
+                    functionMetaDataBuilder.setFunctionAuthSpec(
+                            Function.FunctionAuthenticationSpec.newBuilder()
+                                    .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+                                    .build());
+                }
+            } catch (Exception e) {
+                log.error("Error caching authentication data for {} {}/{}/{}", componentType, tenant, namespace, componentName, e);
+
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", componentType, componentName, e.getMessage()));
+            }
+        }
 
         PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
         try {
@@ -494,6 +520,7 @@ public abstract class ComponentImpl {
         String existingComponentConfigJson;
 
         FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+
         if (componentType.equals(FUNCTION)) {
             FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
             existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
@@ -573,9 +600,9 @@ public abstract class ComponentImpl {
             throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
         }
 
-        // function state
-        FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
-                .setFunctionDetails(functionDetails).setCreateTime(System.currentTimeMillis()).setVersion(0);
+        // merge from existing metadata
+        FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+                .setFunctionDetails(functionDetails);
 
         PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
         if (isNotBlank(functionPkgUrl) || uploadedInputStreamAsFile != null) {
@@ -590,6 +617,7 @@ public abstract class ComponentImpl {
         }
 
         functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
         updateRequest(functionMetaDataBuilder.build());
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index c71e1c2..45fbbf2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -90,7 +90,7 @@ public class FunctionsImplV2 {
             uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
                                              functionDetailsJson, String functionConfigJson, String clientAppId) {
         delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId, null);
         return Response.ok().build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index 4d27134..0415be2 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -69,7 +69,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                                  final @FormDataParam("functionConfig") String functionConfigJson) {
 
         functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, functionConfigJson, clientAppId());
+                functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index ee7d1a4..e5d257f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -25,7 +25,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
@@ -34,7 +33,6 @@ import org.glassfish.jersey.media.multipart.FormDataParam;
 
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -62,7 +60,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
                              final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
     }
 
     @PUT
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index f2b1936..e07bfe0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -60,7 +60,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
                                    final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sourceConfigJson, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
 
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index a66ba90..91fec23 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -48,7 +48,14 @@ import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 @Slf4j
 public class FunctionRuntimeManagerTest {
@@ -663,7 +670,9 @@ public class FunctionRuntimeManagerTest {
     public void testExternallyManagedRuntimeUpdate() throws Exception {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
-        workerConfig.setKubernetesContainerFactory(new WorkerConfig.KubernetesContainerFactory());
+        workerConfig.setKubernetesContainerFactory(
+                new WorkerConfig.KubernetesContainerFactory()
+                        .setSubmittingInsidePod(false));
         workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
         workerConfig.setStateStorageServiceUrl("foo");
         workerConfig.setPulsarFunctionsCluster("cluster");
@@ -679,8 +688,8 @@ public class FunctionRuntimeManagerTest {
         doReturn(pulsarClient).when(workerService).getClient();
         doReturn(mock(PulsarAdmin.class)).when(workerService).getFunctionAdmin();
 
-
         KubernetesRuntimeFactory kubernetesRuntimeFactory = mock(KubernetesRuntimeFactory.class);
+        doNothing().when(kubernetesRuntimeFactory).setupClient();
         doReturn(true).when(kubernetesRuntimeFactory).externallyManaged();
 
         doReturn(mock(KubernetesRuntime.class)).when(kubernetesRuntimeFactory).createContainer(any(), any(), any(), any());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index bdfb50e..e02b253 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -487,7 +487,7 @@ public class FunctionApiV3ResourceTest {
                 functionPkgUrl,
                 null,
                 new Gson().toJson(functionConfig),
-                null);
+                null, null);
 
     }
 
@@ -502,7 +502,7 @@ public class FunctionApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(functionConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function already exists")
@@ -1446,7 +1446,7 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                null, new Gson().toJson(functionConfig), null);
+                null, new Gson().toJson(functionConfig), null, null);
 
     }
 
@@ -1478,7 +1478,7 @@ public class FunctionApiV3ResourceTest {
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
-                null, new Gson().toJson(functionConfig), null);
+                null, new Gson().toJson(functionConfig), null, null);
     }
 
     public static FunctionConfig createDefaultFunctionConfig() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 9e55800..1b475ed 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -420,7 +420,7 @@ public class SinkApiV3ResourceTest {
                 pkgUrl,
                 null,
                 new Gson().toJson(sinkConfig),
-                null);
+                null, null);
 
     }
 
@@ -435,7 +435,7 @@ public class SinkApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(sinkConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink already exists")
@@ -529,7 +529,7 @@ public class SinkApiV3ResourceTest {
                 null,
                 null,
                 new Gson().toJson(sinkConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 6ba8914..43f3303 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -389,7 +389,7 @@ public class SourceApiV3ResourceTest {
                 pkgUrl,
                 null,
                 new Gson().toJson(sourceConfig),
-                null);
+                null, null);
 
     }
 
@@ -404,7 +404,7 @@ public class SourceApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(sourceConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source already exists")
@@ -498,7 +498,7 @@ public class SourceApiV3ResourceTest {
                 null,
                 null,
                 new Gson().toJson(sourceConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")