You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/03/28 16:26:18 UTC

[pulsar] branch master updated: [ISSUE 3763] - Implementing Function authorization (#3874)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14d1eaa  [ISSUE 3763] - Implementing Function authorization (#3874)
14d1eaa is described below

commit 14d1eaa73e1479e403042da87ad34c7a35a304e2
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Mar 28 11:26:12 2019 -0500

    [ISSUE 3763] - Implementing Function authorization (#3874)
    
    * Implementing Function Authorization
---
 conf/functions_worker.yml                          |  13 +-
 .../authorization/AuthorizationProvider.java       |  10 +
 .../broker/authorization/AuthorizationService.java |   6 +-
 .../authorization/PulsarAuthorizationProvider.java |  42 ++
 .../org/apache/pulsar/PulsarBrokerStarter.java     |   6 +
 .../java/org/apache/pulsar/PulsarStandalone.java   |   7 +
 .../org/apache/pulsar/broker/PulsarService.java    |  10 +-
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  32 +-
 .../apache/pulsar/broker/admin/impl/SinkBase.java  |  22 +-
 .../pulsar/broker/admin/impl/SourceBase.java       |  22 +-
 .../api/AuthorizationProducerConsumerTest.java     |   5 +
 .../worker/PulsarFunctionE2ESecurityTest.java      | 516 +++++++++++++++++++++
 .../worker/PulsarWorkerAssignmentTest.java         |   4 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |   4 +
 .../pulsar/common/policies/data/AuthAction.java    |   3 +
 .../org/apache/pulsar/functions/worker/Worker.java |  68 ++-
 .../pulsar/functions/worker/WorkerConfig.java      |  22 +
 .../pulsar/functions/worker/WorkerService.java     |  19 +-
 .../functions/worker/rest/api/ComponentImpl.java   | 269 +++++++++--
 .../functions/worker/rest/api/FunctionsImpl.java   |  13 +-
 .../functions/worker/rest/api/FunctionsImplV2.java |  24 +-
 .../pulsar/functions/worker/rest/api/SinkImpl.java |  13 +-
 .../functions/worker/rest/api/SourceImpl.java      |  12 +-
 .../worker/rest/api/v3/FunctionApiV3Resource.java  |  30 +-
 .../worker/rest/api/v3/SinkApiV3Resource.java      |  22 +-
 .../worker/rest/api/v3/SourceApiV3Resource.java    |  22 +-
 .../worker/rest/api/FunctionsImplTest.java         |   2 +-
 .../rest/api/v3/FunctionApiV3ResourceTest.java     |  20 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |  16 +-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  16 +-
 30 files changed, 1091 insertions(+), 179 deletions(-)

diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 0c1125d..776d4cd 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -26,6 +26,13 @@ workerHostname: localhost
 workerPort: 6750
 workerPortTls: 6751
 
+# Configuration Store connection string
+configurationStoreServers: localhost:2181
+# ZooKeeper session timeout in milliseconds
+zooKeeperSessionTimeoutMillis: 30000
+# ZooKeeper operation timeout in seconds
+zooKeeperOperationTimeoutSeconds: 30
+
 ################################
 # Function package management
 ################################
@@ -129,8 +136,10 @@ processContainerFactory:
 authenticationEnabled: false
 # Enforce authorization on accessing functions api
 authorizationEnabled: false
-# Set of autentication provider name list, which is a list of class names
-authenticationProviders: 
+# Set of authentication provider name list, which is a list of class names
+authenticationProviders:
+# Authorization provider fully qualified class-name
+authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
 # Set of role names that are treated as "super-user", meaning they will be able to access any admin-api
 superUserRoles: 
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index b25c789..9787eae 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -96,6 +96,16 @@ public interface AuthorizationProvider extends Closeable {
             AuthenticationDataSource authenticationData);
 
     /**
+     * Allow all function operations with in this namespace
+     * @param namespaceName The namespace that the function operations can be executed in
+     * @param role The role to check
+     * @param authenticationData authentication data related to the role
+     * @return a boolean to determine whether authorized or not
+     */
+    CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+                                                     AuthenticationDataSource authenticationData);
+
+    /**
      *
      * Grant authorization-action permission on a namespace to the given client
      *
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index de6f799..95ff764 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -32,8 +32,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
 
@@ -307,4 +305,8 @@ public class AuthorizationService {
         return finalResult;
     }
 
+    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+                                                       AuthenticationDataSource authenticationData) {
+        return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 7914168..8dd04d4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -210,6 +210,48 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
     }
 
     @Override
+    public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+        CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
+        try {
+            configCache.policiesCache().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
+                if (!policies.isPresent()) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
+                    }
+                } else {
+                    Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies.namespace_auth;
+                    Set<AuthAction> namespaceActions = namespaceRoles.get(role);
+                    if (namespaceActions != null && namespaceActions.contains(AuthAction.functions)) {
+                        // The role has namespace level permission
+                        permissionFuture.complete(true);
+                        return;
+                    }
+
+                    // Using wildcard
+                    if (conf.isAuthorizationAllowWildcardsMatching()) {
+                        if (checkWildcardPermission(role, AuthAction.functions, namespaceRoles)) {
+                            // The role has namespace level permission by wildcard match
+                            permissionFuture.complete(true);
+                            return;
+                        }
+                    }
+                }
+                permissionFuture.complete(false);
+            }).exceptionally(ex -> {
+                log.warn("Client  with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
+                        ex.getMessage());
+                permissionFuture.completeExceptionally(ex);
+                return null;
+            });
+        } catch (Exception e) {
+            log.warn("Client  with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
+                    e.getMessage());
+            permissionFuture.completeExceptionally(e);
+        }
+        return permissionFuture;
+    }
+
+    @Override
     public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions,
             String role, String authDataJson) {
         return grantPermissionAsync(topicName.getNamespaceObject(), actions, role, authDataJson);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index d9acecc..346a3eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -166,6 +166,12 @@ public class PulsarBrokerStarter {
                     "c-" + brokerConfig.getClusterName()
                         + "-fw-" + hostname
                         + "-" + workerConfig.getWorkerPort());
+                // inherit broker authorization setting
+                workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
+                workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
+                workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
+                workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
+                workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());
                 functionsWorkerService = new WorkerService(workerConfig);
             } else {
                 functionsWorkerService = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index feabe02..4acbc44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -297,6 +297,13 @@ public class PulsarStandalone implements AutoCloseable {
                 "c-" + config.getClusterName()
                     + "-fw-" + hostname
                     + "-" + workerConfig.getWorkerPort());
+            // inherit broker authorization setting
+            workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
+            workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());
+            workerConfig.setConfigurationStoreServers(config.getConfigurationStoreServers());
+            workerConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
+            workerConfig.setZooKeeperOperationTimeoutSeconds(config.getZooKeeperOperationTimeoutSeconds());
+
             fnWorkerService = new WorkerService(workerConfig);
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ed4d928..400d207 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -57,6 +57,8 @@ import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
 import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -438,7 +440,7 @@ public class PulsarService implements AutoCloseable {
             acquireSLANamespace();
 
             // start function worker service if necessary
-            this.startWorkerService();
+            this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());
 
             LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}",
                     config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(),
@@ -945,7 +947,9 @@ public class PulsarService implements AutoCloseable {
         return schemaRegistryService;
     }
 
-    private void startWorkerService() throws InterruptedException, IOException, KeeperException {
+    private void startWorkerService(AuthenticationService authenticationService,
+                                    AuthorizationService authorizationService)
+            throws InterruptedException, IOException, KeeperException {
         if (functionWorkerService.isPresent()) {
             LOG.info("Starting function worker service");
             String namespace = functionWorkerService.get()
@@ -1041,7 +1045,7 @@ public class PulsarService implements AutoCloseable {
                 throw ioe;
             }
             LOG.info("Function worker service setup completed");
-            functionWorkerService.get().start(dlogURI);
+            functionWorkerService.get().start(dlogURI, authenticationService, authorizationService);
             LOG.info("Function worker service started");
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index c4fd33f..e70e5cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -103,7 +103,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                final @FormDataParam("functionConfig") String functionConfigJson) {
 
         functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, functionConfigJson, clientAppId());
+                functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
     }
 
 
@@ -120,7 +120,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public void deregisterFunction(final @PathParam("tenant") String tenant,
                                    final @PathParam("namespace") String namespace,
                                    final @PathParam("functionName") String functionName) {
-        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+        functions.deregisterFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -138,7 +138,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
                                           final @PathParam("namespace") String namespace,
                                           final @PathParam("functionName") String functionName) throws IOException {
-         return functions.getFunctionInfo(tenant, namespace, functionName);
+         return functions.getFunctionInfo(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -158,7 +158,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @PathParam("namespace") String namespace,
             final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -177,7 +177,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace,
             final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -195,7 +195,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
                                         final @PathParam("namespace") String namespace,
                                         final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -215,7 +215,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
             final @PathParam("namespace") String namespace,
             final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -231,7 +231,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Path("/{tenant}/{namespace}")
     public List<String> listFunctions(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -253,7 +253,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                   final @FormDataParam("data") String triggerValue,
                                   final @FormDataParam("dataStream") InputStream triggerStream,
                                   final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -272,7 +272,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                           final @PathParam("namespace") String namespace,
                                           final @PathParam("functionName") String functionName,
                                           final @PathParam("key") String key) {
-        return functions.getFunctionState(tenant, namespace, functionName, key);
+        return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -288,7 +288,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("functionName") String functionName,
                                     final @PathParam("instanceId") String instanceId) {
-        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -303,7 +303,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public void restartFunction(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("functionName") String functionName) {
-        functions.restartFunctionInstances(tenant, namespace, functionName);
+        functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -319,7 +319,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                              final @PathParam("namespace") String namespace,
                              final @PathParam("functionName") String functionName,
                              final @PathParam("instanceId") String instanceId) {
-        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -334,7 +334,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public void stopFunction(final @PathParam("tenant") String tenant,
                              final @PathParam("namespace") String namespace,
                              final @PathParam("functionName") String functionName) {
-        functions.stopFunctionInstances(tenant, namespace, functionName);
+        functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -350,7 +350,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                               final @PathParam("namespace") String namespace,
                               final @PathParam("functionName") String functionName,
                               final @PathParam("instanceId") String instanceId) {
-        functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -365,7 +365,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public void startFunction(final @PathParam("tenant") String tenant,
                               final @PathParam("namespace") String namespace,
                               final @PathParam("functionName") String functionName) {
-        functions.startFunctionInstances(tenant, namespace, functionName);
+        functions.startFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index ffd4dd7..e034950 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -100,7 +100,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                            final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
          sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
 
     }
 
@@ -118,7 +118,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public void deregisterSink(final @PathParam("tenant") String tenant,
                                final @PathParam("namespace") String namespace,
                                final @PathParam("sinkName") String sinkName) {
-        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -157,7 +157,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
             final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) throws IOException {
         return sink.getSinkInstanceStatus(
-            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+            tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -175,7 +175,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sinkName") String sinkName) throws IOException {
-        return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri());
+        return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -191,7 +191,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     @Path("/{tenant}/{namespace}")
     public List<String> listSinks(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
-        return sink.listFunctions(tenant, namespace);
+        return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -207,7 +207,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sinkName") String sinkName,
                             final @PathParam("instanceId") String instanceId) {
-        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -222,7 +222,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public void restartSink(final @PathParam("tenant") String tenant,
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sinkName") String sinkName) {
-        sink.restartFunctionInstances(tenant, namespace, sinkName);
+        sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -238,7 +238,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                          final @PathParam("namespace") String namespace,
                          final @PathParam("sinkName") String sinkName,
                          final @PathParam("instanceId") String instanceId) {
-        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -253,7 +253,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public void stopSink(final @PathParam("tenant") String tenant,
                          final @PathParam("namespace") String namespace,
                          final @PathParam("sinkName") String sinkName) {
-        sink.stopFunctionInstances(tenant, namespace, sinkName);
+        sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -269,7 +269,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                           final @PathParam("namespace") String namespace,
                           final @PathParam("sinkName") String sinkName,
                           final @PathParam("instanceId") String instanceId) {
-        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -284,7 +284,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public void startSink(final @PathParam("tenant") String tenant,
                           final @PathParam("namespace") String namespace,
                           final @PathParam("sinkName") String sinkName) {
-        sink.startFunctionInstances(tenant, namespace, sinkName);
+        sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 6078ede..ef70b67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -100,7 +100,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                              final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-            functionPkgUrl, null, sourceConfigJson, clientAppId());
+            functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
     }
 
 
@@ -117,7 +117,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public void deregisterSource(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("sourceName") String sourceName) {
-        source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -156,7 +156,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
             final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) throws IOException {
         return source.getSourceInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+            tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -174,7 +174,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
                                         final @PathParam("namespace") String namespace,
                                         final @PathParam("sourceName") String sourceName) throws IOException {
-        return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri());
+        return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -190,7 +190,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     @Path("/{tenant}/{namespace}")
     public List<String> listSources(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace) {
-        return source.listFunctions(tenant, namespace);
+        return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -205,7 +205,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                               final @PathParam("namespace") String namespace,
                               final @PathParam("sourceName") String sourceName,
                               final @PathParam("instanceId") String instanceId) {
-        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -219,7 +219,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public void restartSource(final @PathParam("tenant") String tenant,
                               final @PathParam("namespace") String namespace,
                               final @PathParam("sourceName") String sourceName) {
-        source.restartFunctionInstances(tenant, namespace, sourceName);
+        source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -234,7 +234,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                            final @PathParam("namespace") String namespace,
                            final @PathParam("sourceName") String sourceName,
                            final @PathParam("instanceId") String instanceId) {
-        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -248,7 +248,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public void stopSource(final @PathParam("tenant") String tenant,
                            final @PathParam("namespace") String namespace,
                            final @PathParam("sourceName") String sourceName) {
-        source.stopFunctionInstances(tenant, namespace, sourceName);
+        source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -263,7 +263,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sourceName") String sourceName,
                             final @PathParam("instanceId") String instanceId) {
-        source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -277,7 +277,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public void startSource(final @PathParam("tenant") String tenant,
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sourceName") String sourceName) {
-        source.startFunctionInstances(tenant, namespace, sourceName);
+        source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 72838e8..d8fbc91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -440,6 +440,11 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
         }
 
         @Override
+        public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+            return null;
+        }
+
+        @Override
         public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
                 String role, String authenticationData) {
             return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
new file mode 100644
index 0000000..b139457
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.io.PulsarFunctionE2ETest;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.crypto.SecretKey;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.fail;
+
+public class PulsarFunctionE2ESecurityTest {
+
+    LocalBookkeeperEnsemble bkEnsemble;
+
+    ServiceConfiguration config;
+    WorkerConfig workerConfig;
+    URL brokerWebServiceUrl;
+    PulsarService pulsar;
+    PulsarAdmin superUserAdmin;
+    PulsarClient pulsarClient;
+    BrokerStats brokerStatsClient;
+    WorkerService functionsWorkerService;
+    final String TENANT = "external-repl-prop";
+    final String NAMESPACE = "test-ns";
+    String pulsarFunctionsNamespace = TENANT + "/use/pulsar-function-admin";
+    String primaryHost;
+    String workerId;
+
+    private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+    private final int brokerWebServicePort = PortManager.nextFreePort();
+    private final int brokerServicePort = PortManager.nextFreePort();
+    private final int workerServicePort = PortManager.nextFreePort();
+    private SecretKey secretKey;
+
+    private static final String SUBJECT = "my-test-subject";
+    private static final String ADMIN_SUBJECT = "superUser";
+
+    private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+    private String adminToken;
+    private String brokerServiceUrl;
+
+    @DataProvider(name = "validRoleName")
+    public Object[][] validRoleName() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
+    @BeforeMethod
+    void setup(Method method) throws Exception {
+
+        log.info("--- Setting up method {} ---", method.getName());
+
+        // Start local bookkeeper ensemble
+        bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
+        bkEnsemble.start();
+
+        brokerServiceUrl = "http://127.0.0.1:" + brokerWebServicePort;
+
+        config = spy(new ServiceConfiguration());
+        config.setClusterName("use");
+        Set<String> superUsers = Sets.newHashSet(ADMIN_SUBJECT);
+        config.setSuperUserRoles(superUsers);
+        config.setWebServicePort(brokerWebServicePort);
+        config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+        config.setBrokerServicePort(brokerServicePort);
+        config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+        config.setAdvertisedAddress("localhost");
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        config.setAuthenticationEnabled(true);
+        config.setAuthenticationProviders(providers);
+        config.setAuthorizationEnabled(true);
+        config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+        secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey",
+                AuthTokenUtils.encodeKeyBase64(secretKey));
+        config.setProperties(properties);
+
+        adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_SUBJECT, Optional.empty());
+
+        config.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        config.setBrokerClientAuthenticationParameters(
+                "token:" +  adminToken);
+        functionsWorkerService = createPulsarFunctionWorker(config);
+        brokerWebServiceUrl = new URL(brokerServiceUrl);
+        Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
+        pulsar = new PulsarService(config, functionWorkerService);
+        pulsar.start();
+
+        AuthenticationToken authToken = new AuthenticationToken();
+        authToken.configure("token:" +  adminToken);
+
+        superUserAdmin = spy(
+                PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken).build());
+
+        brokerStatsClient = superUserAdmin.brokerStats();
+        primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+        // update cluster metadata
+        ClusterData clusterData = new ClusterData(brokerWebServiceUrl.toString());
+        superUserAdmin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+        ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+        if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+                && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+            clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+                    workerConfig.getClientAuthenticationParameters());
+        }
+        pulsarClient = clientBuilder.build();
+
+        TenantInfo propAdmin = new TenantInfo();
+        propAdmin.getAdminRoles().add(ADMIN_SUBJECT);
+        propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+        superUserAdmin.tenants().updateTenant(TENANT, propAdmin);
+
+        final String replNamespace = TENANT + "/" + NAMESPACE;
+        superUserAdmin.namespaces().createNamespace(replNamespace);
+        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+        superUserAdmin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+        System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
+        Thread.sleep(100);
+    }
+
+    @AfterMethod
+    void shutdown() throws Exception {
+        log.info("--- Shutting down ---");
+        pulsarClient.close();
+        superUserAdmin.close();
+        functionsWorkerService.stop();
+        pulsar.close();
+        bkEnsemble.stop();
+    }
+
+    private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
+        workerConfig = new WorkerConfig();
+        workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+        workerConfig.setSchedulerClassName(
+                org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+        workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+        // worker talks to local broker
+        workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
+        workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
+        workerConfig.setFailureCheckFreqMs(100);
+        workerConfig.setNumFunctionPackageReplicas(1);
+        workerConfig.setClusterCoordinationTopicName("coordinate");
+        workerConfig.setFunctionAssignmentTopicName("assignment");
+        workerConfig.setFunctionMetadataTopicName("metadata");
+        workerConfig.setInstanceLivenessCheckFreqMs(100);
+        workerConfig.setWorkerPort(workerServicePort);
+        workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+        String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+        workerConfig.setWorkerHostname(hostname);
+        workerConfig.setWorkerId(workerId);
+
+        workerConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+        workerConfig.setClientAuthenticationParameters(
+                String.format("token:%s", adminToken));
+
+        workerConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
+        workerConfig.setAuthenticationProviders(config.getAuthenticationProviders());
+        workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
+        workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());
+
+        return new WorkerService(workerConfig);
+    }
+
+    protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(tenant);
+        functionConfig.setNamespace(namespace);
+        functionConfig.setName(functionName);
+        functionConfig.setParallelism(1);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        functionConfig.setSubName(subscriptionName);
+        functionConfig.setInputs(Collections.singleton(sourceTopic));
+        functionConfig.setAutoAck(true);
+        functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setOutput(sinkTopic);
+        functionConfig.setCleanupSubscription(true);
+        return functionConfig;
+    }
+
+    @Test
+    public void testAuthorization() throws Exception {
+        String token1 = AuthTokenUtils.createToken(secretKey, SUBJECT, Optional.empty());
+        String token2 = AuthTokenUtils.createToken(secretKey, "wrong-subject", Optional.empty());
+
+        final String replNamespace = TENANT + "/" + NAMESPACE;
+        final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+        final String sinkTopic = "persistent://" + replNamespace + "/output";
+        final String propertyKey = "key";
+        final String propertyValue = "value";
+        final String functionName = "PulsarFunction-test";
+        final String subscriptionName = "test-sub";
+
+
+        // create user admin client
+        AuthenticationToken authToken1 = new AuthenticationToken();
+        authToken1.configure("token:" +  token1);
+
+        AuthenticationToken authToken2 = new AuthenticationToken();
+        authToken2.configure("token:" +  token2);
+
+        try(PulsarAdmin admin1 = spy(
+                PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken1).build());
+            PulsarAdmin admin2 = spy(
+                    PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken2).build())
+        ) {
+
+            String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+
+            FunctionConfig functionConfig = createFunctionConfig(TENANT, NAMESPACE, functionName,
+                    sourceTopic, sinkTopic, subscriptionName);
+
+            // creating function should fail since admin1 doesn't have permissions granted yet
+            try {
+                admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+                fail("client admin shouldn't have permissions to create function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+
+            // grant permissions to admin1
+            Set<AuthAction> actions = new HashSet<>();
+            actions.add(AuthAction.functions);
+            actions.add(AuthAction.produce);
+            actions.add(AuthAction.consume);
+            superUserAdmin.namespaces().grantPermissionOnNamespace(replNamespace, SUBJECT, actions);
+
+            // user should be able to create function now
+            admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+            // admin2 should still fail
+            try {
+                admin2.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+                fail("client admin shouldn't have permissions to create function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+
+            retryStrategically((test) -> {
+                try {
+                    return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 1
+                            && admin1.topics().getStats(sourceTopic).subscriptions.size() == 1;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 5, 150);
+            // validate pulsar sink consumer has started on the topic
+            assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 1);
+            assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+            // create a producer that creates a topic at broker
+            try(Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+            Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe()) {
+
+                int totalMsgs = 5;
+                for (int i = 0; i < totalMsgs; i++) {
+                    String data = "my-message-" + i;
+                    producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+                }
+                retryStrategically((test) -> {
+                    try {
+                        SubscriptionStats subStats = admin1.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+                        return subStats.unackedMessages == 0;
+                    } catch (PulsarAdminException e) {
+                        return false;
+                    }
+                }, 5, 150);
+
+                Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+                String receivedPropertyValue = msg.getProperty(propertyKey);
+                assertEquals(propertyValue, receivedPropertyValue);
+
+
+                // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked
+                // messages
+                // due to publish failure
+                assertNotEquals(admin1.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+                        totalMsgs);
+            }
+
+            // test update functions
+            functionConfig.setParallelism(2);
+            // admin2 should still fail
+            try {
+                admin2.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+                fail("client admin shouldn't have permissions to update function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+
+            admin1.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+            retryStrategically((test) -> {
+                try {
+                    return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 2;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 5, 150);
+
+            assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 2);
+
+            // test getFunctionInfo
+            try {
+                admin2.functions().getFunction(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to get function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().getFunction(TENANT, NAMESPACE, functionName);
+
+            // test getFunctionInstanceStatus
+            try {
+                admin2.functions().getFunctionStatus(TENANT, NAMESPACE, functionName, 0);
+                fail("client admin shouldn't have permissions to get function status");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName, 0);
+
+            // test getFunctionStatus
+            try {
+                admin2.functions().getFunctionStatus(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to get function status");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName);
+
+            // test getFunctionStats
+            try {
+                admin2.functions().getFunctionStats(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to get function stats");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName);
+
+            // test getFunctionInstanceStats
+            try {
+                admin2.functions().getFunctionStats(TENANT, NAMESPACE, functionName, 0);
+                fail("client admin shouldn't have permissions to get function stats");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName, 0);
+
+            // test listFunctions
+            try {
+                admin2.functions().getFunctions(TENANT, NAMESPACE);
+                fail("client admin shouldn't have permissions to list functions");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().getFunctions(TENANT, NAMESPACE);
+
+            // test triggerFunction
+            try {
+                admin2.functions().triggerFunction(TENANT, NAMESPACE, functionName, sourceTopic, "foo", null);
+                fail("client admin shouldn't have permissions to trigger function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().triggerFunction(TENANT, NAMESPACE, functionName, sourceTopic, "foo", null);
+
+            // test restartFunctionInstance
+            try {
+                admin2.functions().restartFunction(TENANT, NAMESPACE, functionName, 0);
+                fail("client admin shouldn't have permissions to restart function instance");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().restartFunction(TENANT, NAMESPACE, functionName, 0);
+
+            // test restartFunctionInstances
+            try {
+                admin2.functions().restartFunction(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to restart function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+            // test stopFunction instance
+            try {
+                admin2.functions().stopFunction(TENANT, NAMESPACE, functionName, 0);
+                fail("client admin shouldn't have permissions to stop function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().stopFunction(TENANT, NAMESPACE, functionName, 0);
+
+            // test stopFunction all instance
+            try {
+                admin2.functions().stopFunction(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to restart function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().stopFunction(TENANT, NAMESPACE, functionName);
+
+            // test startFunction instance
+            try {
+                admin2.functions().startFunction(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to restart function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+            // test startFunction all instances
+            try {
+                admin2.functions().restartFunction(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to restart function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+            admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+            // delete functions
+            // admin2 should still fail
+            try {
+                admin2.functions().deleteFunction(TENANT, NAMESPACE, functionName);
+                fail("client admin shouldn't have permissions to delete function");
+            } catch (PulsarAdminException.NotAuthorizedException e) {
+
+            }
+
+            admin1.functions().deleteFunction(TENANT, NAMESPACE, functionName);
+
+            retryStrategically((test) -> {
+                try {
+                    return admin1.topics().getStats(sourceTopic).subscriptions.size() == 0;
+                } catch (PulsarAdminException e) {
+                    return false;
+                }
+            }, 5, 150);
+
+            // make sure subscriptions are cleanup
+            assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 0);
+        }
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 91a5910..febdadd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -25,12 +25,14 @@ import org.apache.bookkeeper.test.PortManager;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -289,7 +291,7 @@ public class PulsarWorkerAssignmentTest {
         final URI dlUri = functionsWorkerService.getDlogUri();
         functionsWorkerService.stop();
         functionsWorkerService = new WorkerService(workerConfig);
-        functionsWorkerService.start(dlUri);
+        functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null);
         final FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager();
         retryStrategically((test) -> {
             try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 253d2b6..422e9ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
 import org.apache.pulsar.client.admin.BrokerStats;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -157,6 +158,9 @@ public class PulsarFunctionE2ETest {
         config.setAuthenticationEnabled(true);
         config.setAuthenticationProviders(providers);
 
+        config.setAuthorizationEnabled(true);
+        config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
         config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
         config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
index 1fd2301..faef870 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
@@ -27,4 +27,7 @@ public enum AuthAction {
 
     /** Permission to consume messages */
     consume,
+
+    /** Permissions for functions ops **/
+    functions,
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 65dbc40..db84462 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -18,18 +18,30 @@
  */
 package org.apache.pulsar.functions.worker;
 
+import io.netty.util.concurrent.DefaultThreadFactory;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
+import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 @Slf4j
 public class Worker {
@@ -38,6 +50,13 @@ public class Worker {
     private final WorkerService workerService;
     private WorkerServer server;
 
+    private ZooKeeperClientFactory zkClientFactory = null;
+    private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
+    private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
+            new DefaultThreadFactory("zk-cache-callback"));
+    private GlobalZooKeeperCache globalZkCache;
+    private ConfigurationCacheService configurationCacheService;
+
     public Worker(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
         this.workerService = new WorkerService(workerConfig);
@@ -46,7 +65,7 @@ public class Worker {
     protected void start() throws Exception {
         URI dlogUri = initialize(this.workerConfig);
 
-        workerService.start(dlogUri);
+        workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService());
         this.server = new WorkerServer(workerService);
         this.server.start();
         log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort());
@@ -132,15 +151,60 @@ public class Worker {
         }
     }
 
+    private AuthorizationService getAuthorizationService() throws PulsarServerException {
+
+        if (this.workerConfig.isAuthorizationEnabled()) {
+
+            log.info("starting configuration cache service");
+
+            this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
+                    (int) workerConfig.getZooKeeperSessionTimeoutMillis(),
+                    workerConfig.getZooKeeperOperationTimeoutSeconds(),
+                    workerConfig.getConfigurationStoreServers(),
+                    orderedExecutor, cacheExecutor);
+            try {
+                this.globalZkCache.start();
+            } catch (IOException e) {
+                throw new PulsarServerException(e);
+            }
+
+            this.configurationCacheService = new ConfigurationCacheService(this.globalZkCache, this.workerConfig.getPulsarFunctionsCluster());
+                return new AuthorizationService(PulsarConfigurationLoader.convertFrom(workerConfig), this.configurationCacheService);
+            }
+        return null;
+    }
+
+    private AuthenticationService getAuthenticationService() throws PulsarServerException {
+        return new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
+    }
+
+    public ZooKeeperClientFactory getZooKeeperClientFactory() {
+        if (zkClientFactory == null) {
+            zkClientFactory = new ZookeeperBkClientFactoryImpl(orderedExecutor);
+        }
+        // Return default factory
+        return zkClientFactory;
+    }
+
     protected void stop() {
         try {
             if (null != this.server) {
                 this.server.stop();
             }
             workerService.stop();    
-        }catch(Exception e) {
+        } catch(Exception e) {
             log.warn("Failed to gracefully stop worker service ", e);
         }
+
+        if (this.globalZkCache != null) {
+            try {
+                this.globalZkCache.close();
+            } catch (IOException e) {
+                log.warn("Failed to close global zk cache ", e);
+            }
+        }
+
+
         
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index df7f14d..6c42a24 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -33,6 +33,7 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
 import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -105,6 +106,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private int numHttpServerThreads = 8;
     @FieldContext(
+            category = CATEGORY_WORKER,
+            required = false,
+            doc = "Configuration store connection string (as a comma-separated list)"
+    )
+    private String configurationStoreServers;
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "ZooKeeper session timeout in milliseconds"
+    )
+    private long zooKeeperSessionTimeoutMillis = 30000;
+    @FieldContext(
+            category = CATEGORY_WORKER,
+            doc = "ZooKeeper operation timeout in seconds"
+    )
+    private int zooKeeperOperationTimeoutSeconds = 30;
+    @FieldContext(
         category = CATEGORY_CONNECTORS,
         doc = "The path to the location to locate builtin connectors"
     )
@@ -274,6 +291,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     )
     private boolean authorizationEnabled = false;
     @FieldContext(
+            category = CATEGORY_WORKER_SECURITY,
+            doc = "Authorization provider fully qualified class-name"
+    )
+    private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
+    @FieldContext(
         category = CATEGORY_WORKER_SECURITY,
         doc = "Role names that are treated as `super-user`, meaning they will be able to access any admin-api"
     )
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index d06053b..b035fb8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
 
 import io.netty.util.concurrent.DefaultThreadFactory;
 
+import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -36,16 +37,24 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.admin.StorageAdminClient;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
+import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -69,6 +78,7 @@ public class WorkerService {
     private boolean isInitialized = false;
     private final ScheduledExecutorService statsUpdater;
     private AuthenticationService authenticationService;
+    private AuthorizationService authorizationService;
     private ConnectorsManager connectorsManager;
     private PulsarAdmin brokerAdmin;
     private PulsarAdmin functionAdmin;
@@ -85,7 +95,10 @@ public class WorkerService {
         this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
     }
 
-    public void start(URI dlogUri) throws InterruptedException {
+
+    public void start(URI dlogUri,
+                      AuthenticationService authenticationService,
+                      AuthorizationService authorizationService) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
 
         this.brokerAdmin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
@@ -175,7 +188,9 @@ public class WorkerService {
             // initialize function runtime manager
             this.functionRuntimeManager.initialize();
 
-            authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
+            this.authenticationService = authenticationService;
+
+            this.authorizationService = authorizationService;
 
             // Starting cluster services
             log.info("Start cluster services...");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 23f479a..0ed9fd1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker.rest.api;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 import static org.apache.pulsar.functions.utils.Reflections.createInstance;
 
@@ -79,6 +80,7 @@ import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -91,6 +93,7 @@ import org.apache.pulsar.common.functions.WorkerInfo;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -313,6 +316,17 @@ public abstract class ComponentImpl {
         }
 
         try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
+        try {
             // Check tenant exists
             final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
 
@@ -334,17 +348,6 @@ public abstract class ComponentImpl {
             throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
-        try {
-            if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
-                        componentName, clientRole, componentType);
-                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
-            }
-        } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
-            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
-        }
-
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
@@ -483,7 +486,8 @@ public abstract class ComponentImpl {
                                final String functionPkgUrl,
                                final String functionDetailsJson,
                                final String componentConfigJson,
-                               final String clientRole) {
+                               final String clientRole,
+                               AuthenticationDataHttps clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
@@ -500,7 +504,7 @@ public abstract class ComponentImpl {
         }
 
         try {
-            if (!isAuthorizedRole(tenant, clientRole)) {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
                         componentName, clientRole, componentType);
                 throw new RestException(Status.UNAUTHORIZED, componentType + "client is not authorize to perform operation");
@@ -624,14 +628,15 @@ public abstract class ComponentImpl {
     public void deregisterFunction(final String tenant,
                                    final String namespace,
                                    final String componentName,
-                                   final String clientRole) {
+                                   final String clientRole,
+                                   AuthenticationDataHttps clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
         try {
-            if (!isAuthorizedRole(tenant, clientRole)) {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
                         componentName, clientRole, componentType);
                 throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
@@ -696,12 +701,25 @@ public abstract class ComponentImpl {
 
     public FunctionConfig getFunctionInfo(final String tenant,
                                           final String namespace,
-                                          final String componentName) {
+                                          final String componentName,
+                                          final String clientRole,
+                                          final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to get {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -728,16 +746,20 @@ public abstract class ComponentImpl {
                                      final String namespace,
                                      final String componentName,
                                      final String instanceId,
-                                     final URI uri) {
-        changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri);
+                                     final URI uri,
+                                     final String clientRole,
+                                     final AuthenticationDataSource clientAuthenticationDataHttps) {
+        changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri, clientRole, clientAuthenticationDataHttps);
     }
 
     public void startFunctionInstance(final String tenant,
                                       final String namespace,
                                       final String componentName,
                                       final String instanceId,
-                                      final URI uri) {
-        changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri);
+                                      final URI uri,
+                                      final String clientRole,
+                                      final AuthenticationDataSource clientAuthenticationDataHttps) {
+        changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri, clientRole, clientAuthenticationDataHttps);
     }
 
     public void changeFunctionInstanceStatus(final String tenant,
@@ -745,12 +767,25 @@ public abstract class ComponentImpl {
                                              final String componentName,
                                              final String instanceId,
                                              final boolean start,
-                                             final URI uri) {
+                                             final URI uri,
+                                             final String clientRole,
+                                             final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
@@ -791,11 +826,24 @@ public abstract class ComponentImpl {
                                         final String namespace,
                                         final String componentName,
                                         final String instanceId,
-                                        final URI uri) {
+                                        final URI uri,
+                                        final String clientRole,
+                                        final AuthenticationDataSource clientAuthenticationDataHttps) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
@@ -830,25 +878,42 @@ public abstract class ComponentImpl {
 
     public void stopFunctionInstances(final String tenant,
                                       final String namespace,
-                                      final String componentName) {
-        changeFunctionStatusAllInstances(tenant, namespace, componentName, false);
+                                      final String componentName,
+                                      final String clientRole,
+                                      final AuthenticationDataSource clientAuthenticationDataHttps) {
+        changeFunctionStatusAllInstances(tenant, namespace, componentName, false, clientRole, clientAuthenticationDataHttps);
     }
 
     public void startFunctionInstances(final String tenant,
                                        final String namespace,
-                                       final String componentName) {
-        changeFunctionStatusAllInstances(tenant, namespace, componentName, true);
+                                       final String componentName,
+                                       final String clientRole,
+                                       final AuthenticationDataSource clientAuthenticationDataHttps) {
+        changeFunctionStatusAllInstances(tenant, namespace, componentName, true, clientRole, clientAuthenticationDataHttps);
     }
 
     public void changeFunctionStatusAllInstances(final String tenant,
                                                  final String namespace,
                                                  final String componentName,
-                                                 final boolean start) {
+                                                 final boolean start,
+                                                 final String clientRole,
+                                                 final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -886,11 +951,24 @@ public abstract class ComponentImpl {
 
     public void restartFunctionInstances(final String tenant,
                                          final String namespace,
-                                         final String componentName) {
+                                         final String componentName,
+                                         final String clientRole,
+                                         final AuthenticationDataSource clientAuthenticationDataHttps) {
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -925,9 +1003,22 @@ public abstract class ComponentImpl {
     public FunctionStats getFunctionStats(final String tenant,
                                           final String namespace,
                                           final String componentName,
-                                          final URI uri) {
+                                          final URI uri,
+                                          final String clientRole,
+                                          final AuthenticationDataSource clientAuthenticationDataHttps) {
         if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+            throwUnavailableException();
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         // validate parameters
@@ -968,9 +1059,22 @@ public abstract class ComponentImpl {
                                                                                                    final String namespace,
                                                                                                    final String componentName,
                                                                                                    final String instanceId,
-                                                                                                   final URI uri) {
+                                                                                                   final URI uri,
+                                                                                                   final String clientRole,
+                                                                                                   final AuthenticationDataSource clientAuthenticationDataHttps) {
         if (!isWorkerServiceAvailable()) {
-            throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+            throwUnavailableException();
+        }
+
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         // validate parameters
@@ -1014,12 +1118,25 @@ public abstract class ComponentImpl {
         return functionInstanceStatsData;
     }
 
-    public List<String> listFunctions(final String tenant, final String namespace) {
+    public List<String> listFunctions(final String tenant,
+                                      final String namespace,
+                                      final String clientRole,
+                                      final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{} Failed to authorize [{}]", tenant, namespace, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateListFunctionRequestParams(tenant, namespace);
@@ -1074,12 +1191,25 @@ public abstract class ComponentImpl {
                                   final String functionName,
                                   final String input,
                                   final InputStream uploadedInputStream,
-                                  final String topic) {
+                                  final String topic,
+                                  final String clientRole,
+                                  final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to trigger {}", tenant, namespace,
+                        functionName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
@@ -1174,12 +1304,25 @@ public abstract class ComponentImpl {
     public FunctionState getFunctionState(final String tenant,
                                           final String namespace,
                                           final String functionName,
-                                          final String key) {
+                                          final String key,
+                                          final String clientRole,
+                                          final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         if (!isWorkerServiceAvailable()) {
             throwUnavailableException();
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to get state for {}", tenant, namespace,
+                        functionName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         if (null == worker().getStateStoreAdminClient()) {
             throwStateStoreUnvailableResponse();
         }
@@ -1689,21 +1832,33 @@ public abstract class ComponentImpl {
                 Utils.getUniquePackageName(Codec.encode(fileName)));
     }
 
-    public boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException {
+    public boolean isAuthorizedRole(String tenant, String namespace, String clientRole,
+                                    AuthenticationDataSource authenticationData) throws PulsarAdminException {
         if (worker().getWorkerConfig().isAuthorizationEnabled()) {
             // skip authorization if client role is super-user
             if (isSuperUser(clientRole)) {
                 return true;
             }
             TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-            return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()
-                    || tenantInfo.getAdminRoles().contains(clientRole));
+            if (clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()
+                    || tenantInfo.getAdminRoles().contains(clientRole))) {
+                return true;
+            }
+
+            // check if role has permissions granted
+            if (clientRole != null && authenticationData != null) {
+                return allowFunctionOps(NamespaceName.get(tenant, namespace), clientRole, authenticationData);
+            } else {
+                return false;
+            }
         }
         return true;
     }
 
     public boolean isSuperUser(String clientRole) {
-        return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+        return clientRole != null
+                && worker().getWorkerConfig().getSuperUserRoles() != null
+                && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
     public ComponentType calculateSubjectType(FunctionMetaData functionMetaData) {
@@ -1725,11 +1880,24 @@ public abstract class ComponentImpl {
         return SINK;
     }
 
-    protected void componentStatusRequestValidate (final String tenant, final String namespace, final String componentName) {
+    protected void componentStatusRequestValidate (final String tenant, final String namespace, final String componentName,
+                                                   final String clientRole,
+                                                   final AuthenticationDataSource clientAuthenticationDataHttps) {
         if (!isWorkerServiceAvailable()) {
             throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
         }
 
+        try {
+            if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+                log.error("{}/{}/{} Client [{}] is not admin and authorized get status for {}", tenant, namespace,
+                        componentName, clientRole, componentType);
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+            }
+        } catch (PulsarAdminException e) {
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+
         // validate parameters
         try {
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -1754,8 +1922,10 @@ public abstract class ComponentImpl {
     protected void componentInstanceStatusRequestValidate (final String tenant,
                                                            final String namespace,
                                                            final String componentName,
-                                                           final int instanceId) {
-        componentStatusRequestValidate(tenant, namespace, componentName);
+                                                           final int instanceId,
+                                                           final String clientRole,
+                                                           final AuthenticationDataSource clientAuthenticationDataHttps) {
+        componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
@@ -1766,4 +1936,19 @@ public abstract class ComponentImpl {
                     String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
         }
     }
+
+    public boolean allowFunctionOps(NamespaceName namespaceName, String role,
+                                    AuthenticationDataSource authenticationData) {
+        try {
+            return worker().getAuthorizationService().allowFunctionOpsAsync(
+                    namespaceName, role, authenticationData).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+        } catch (InterruptedException e) {
+            log.warn("Time-out {} sec while checking function authorization on {} ", worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), namespaceName);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        } catch (Exception e) {
+            log.warn("Admin-client with Role - {} failed to get function permissions for namespace - {}. {}", role, namespaceName,
+                    e.getMessage(), e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+        }
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 1c5a090..04628d6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -211,10 +212,12 @@ public class FunctionsImpl extends ComponentImpl {
                                                                                                       final String namespace,
                                                                                                       final String componentName,
                                                                                                       final String instanceId,
-                                                                                                      final URI uri) {
+                                                                                                      final URI uri,
+                                                                                                      final String clientRole,
+                                                                                                      final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         // validate parameters
-        componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId));
+        componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
 
         FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData;
         try {
@@ -241,10 +244,12 @@ public class FunctionsImpl extends ComponentImpl {
     public FunctionStatus getFunctionStatus(final String tenant,
                                             final String namespace,
                                             final String componentName,
-                                            final URI uri) {
+                                            final URI uri,
+                                            final String clientRole,
+                                            final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         // validate parameters
-        componentStatusRequestValidate(tenant, namespace, componentName);
+        componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
 
         FunctionStatus functionStatus;
         try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 45fbbf2..49d1156 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -55,7 +55,7 @@ public class FunctionsImplV2 {
             throws IOException {
 
         // run just for parameter checks
-        delegate.getFunctionInfo(tenant, namespace, functionName);
+        delegate.getFunctionInfo(tenant, namespace, functionName, null, null);
 
         FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
 
@@ -69,7 +69,7 @@ public class FunctionsImplV2 {
                                               final String instanceId, URI uri) throws IOException {
 
         org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
-                functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri);
+                functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, null, null);
 
         String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
         return Response.status(Response.Status.OK).entity(jsonResponse).build();
@@ -77,7 +77,7 @@ public class FunctionsImplV2 {
 
     public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
             IOException {
-        FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri);
+        FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, null, null);
         InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
         functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
                 toProto(functionInstanceStatus.getStatus(),
@@ -98,29 +98,29 @@ public class FunctionsImplV2 {
                                    FormDataContentDisposition fileDetail, String functionPkgUrl, String
                                            functionDetailsJson, String functionConfigJson, String clientAppId) {
         delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId, null);
         return Response.ok().build();
     }
 
     public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) {
-        delegate.deregisterFunction(tenant, namespace, functionName, clientAppId);
+        delegate.deregisterFunction(tenant, namespace, functionName, clientAppId, null);
         return Response.ok().build();
     }
 
     public Response listFunctions(String tenant, String namespace) {
-        Collection<String> functionStateList = delegate.listFunctions( tenant, namespace);
+        Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, null, null);
         return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
     }
 
     public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
                                     InputStream triggerStream, String topic) {
-        String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+        String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, null, null);
         return Response.status(Response.Status.OK).entity(result).build();
     }
 
     public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
         FunctionState functionState = delegate.getFunctionState(
-                tenant, namespace, functionName, key);
+                tenant, namespace, functionName, key, null, null);
 
         String value;
         if (functionState.getNumberValue() != null) {
@@ -135,23 +135,23 @@ public class FunctionsImplV2 {
 
     public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
             uri) {
-        delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+        delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, null, null);
         return Response.ok().build();
     }
 
     public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
-        delegate.restartFunctionInstances(tenant, namespace, functionName);
+        delegate.restartFunctionInstances(tenant, namespace, functionName, null, null);
         return Response.ok().build();
     }
 
     public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
             uri) {
-        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+        delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, null ,null);
         return Response.ok().build();
     }
 
     public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
-        delegate.stopFunctionInstances(tenant, namespace, functionName);
+        delegate.stopFunctionInstances(tenant, namespace, functionName, null, null);
         return Response.ok().build();
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 39828a0..acd924b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
@@ -210,10 +211,12 @@ public class SinkImpl extends ComponentImpl {
                                                                                       final String namespace,
                                                                                       final String sinkName,
                                                                                       final String instanceId,
-                                                                                      final URI uri) {
+                                                                                      final URI uri,
+                                                                                      final String clientRole,
+                                                                                      final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         // validate parameters
-        componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+        componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
 
 
         SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
@@ -232,10 +235,12 @@ public class SinkImpl extends ComponentImpl {
     public SinkStatus getSinkStatus(final String tenant,
                                     final String namespace,
                                     final String componentName,
-                                    final URI uri) {
+                                    final URI uri,
+                                    final String clientRole,
+                                    final AuthenticationDataSource clientAuthenticationDataHttps) {
 
         // validate parameters
-        componentStatusRequestValidate(tenant, namespace, componentName);
+        componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
 
         SinkStatus sinkStatus;
         try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index ec7ba30..e5ed28e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.worker.rest.api;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
@@ -211,9 +212,10 @@ public class SourceImpl extends ComponentImpl {
     public SourceStatus getSourceStatus(final String tenant,
                                         final String namespace,
                                         final String componentName,
-                                        final URI uri) {
+                                        final URI uri, final String clientRole,
+                                        final AuthenticationDataSource clientAuthenticationDataHttps) {
         // validate parameters
-        componentStatusRequestValidate(tenant, namespace, componentName);
+        componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
 
         SourceStatus sourceStatus;
         try {
@@ -232,9 +234,11 @@ public class SourceImpl extends ComponentImpl {
                                                                                               final String namespace,
                                                                                               final String sourceName,
                                                                                               final String instanceId,
-                                                                                              final URI uri) {
+                                                                                              final URI uri,
+                                                                                              final String clientRole,
+                                                                                              final AuthenticationDataSource clientAuthenticationDataHttps) {
         // validate parameters
-        componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+        componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
 
         SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData;
         try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index 0415be2..f31ecb3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -85,7 +85,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                                final @FormDataParam("functionConfig") String functionConfigJson) {
 
         functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, functionConfigJson, clientAppId());
+                functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
 
     }
 
@@ -94,7 +94,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
     public void deregisterFunction(final @PathParam("tenant") String tenant,
                                    final @PathParam("namespace") String namespace,
                                    final @PathParam("functionName") String functionName) {
-        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+        functions.deregisterFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -102,7 +102,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
     public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
                                           final @PathParam("namespace") String namespace,
                                           final @PathParam("functionName") String functionName) {
-        return functions.getFunctionInfo(tenant, namespace, functionName);
+        return functions.getFunctionInfo(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -123,7 +123,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
             final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+                tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -143,7 +143,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
             final @PathParam("namespace") String namespace,
             final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-                tenant, namespace, functionName, uri.getRequestUri());
+                tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -161,7 +161,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
     public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
                                           final @PathParam("namespace") String namespace,
                                           final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+        return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -182,7 +182,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
             final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionsInstanceStats(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+                tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -194,7 +194,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                                   final @FormDataParam("data") String input,
                                   final @FormDataParam("dataStream") InputStream uploadedInputStream,
                                   final @FormDataParam("topic") String topic) {
-        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
+        return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -210,7 +210,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("functionName") String functionName,
                                 final @PathParam("instanceId") String instanceId) {
-        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -225,7 +225,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
     public void restartFunction(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("functionName") String functionName) {
-        functions.restartFunctionInstances(tenant, namespace, functionName);
+        functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -241,7 +241,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                              final @PathParam("namespace") String namespace,
                              final @PathParam("functionName") String functionName,
                              final @PathParam("instanceId") String instanceId) {
-        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -256,7 +256,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
     public void stopFunction(final @PathParam("tenant") String tenant,
                              final @PathParam("namespace") String namespace,
                              final @PathParam("functionName") String functionName) {
-        functions.stopFunctionInstances(tenant, namespace, functionName);
+        functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -272,7 +272,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                               final @PathParam("namespace") String namespace,
                               final @PathParam("functionName") String functionName,
                               final @PathParam("instanceId") String instanceId) {
-        functions.startFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        functions.startFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -287,7 +287,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
     public void startFunction(final @PathParam("tenant") String tenant,
                               final @PathParam("namespace") String namespace,
                               final @PathParam("functionName") String functionName) {
-        functions.startFunctionInstances(tenant, namespace, functionName);
+        functions.startFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -316,6 +316,6 @@ public class FunctionApiV3Resource extends FunctionApiResource {
                                           final @PathParam("namespace") String namespace,
                                           final @PathParam("functionName") String functionName,
                                           final @PathParam("key") String key) throws IOException {
-        return functions.getFunctionState(tenant, namespace, functionName, key);
+        return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId(), clientAuthData());
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index e5d257f..462914d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -75,7 +75,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
                            final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
     }
 
     @DELETE
@@ -83,7 +83,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
     public void deregisterSink(final @PathParam("tenant") String tenant,
                                final @PathParam("namespace") String namespace,
                                final @PathParam("sinkName") String sinkName) {
-        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -112,7 +112,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
             final @PathParam("namespace") String namespace,
             final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) throws IOException {
-        return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -130,14 +130,14 @@ public class SinkApiV3Resource extends FunctionApiResource {
     public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sinkName") String sinkName) throws IOException {
-        return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri());
+        return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
     @Path("/{tenant}/{namespace}")
     public List<String> listSink(final @PathParam("tenant") String tenant,
                                  final @PathParam("namespace") String namespace) {
-        return sink.listFunctions(tenant, namespace);
+        return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -151,7 +151,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sinkName") String sinkName,
                             final @PathParam("instanceId") String instanceId) {
-        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -163,7 +163,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
     public void restartSink(final @PathParam("tenant") String tenant,
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sinkName") String sinkName) {
-        sink.restartFunctionInstances(tenant, namespace, sinkName);
+        sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -177,7 +177,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
                          final @PathParam("namespace") String namespace,
                          final @PathParam("sinkName") String sinkName,
                          final @PathParam("instanceId") String instanceId) {
-        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -190,7 +190,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
     public void stopSink(final @PathParam("tenant") String tenant,
                          final @PathParam("namespace") String namespace,
                          final @PathParam("sinkName") String sinkName) {
-        sink.stopFunctionInstances(tenant, namespace, sinkName);
+        sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -204,7 +204,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
                           final @PathParam("namespace") String namespace,
                           final @PathParam("sinkName") String sinkName,
                           final @PathParam("instanceId") String instanceId) {
-        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -217,7 +217,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
     public void startSink(final @PathParam("tenant") String tenant,
                           final @PathParam("namespace") String namespace,
                           final @PathParam("sinkName") String sinkName) {
-        sink.startFunctionInstances(tenant, namespace, sinkName);
+        sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index e07bfe0..4ed3505 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -76,7 +76,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
                              final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sourceConfigJson, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
     }
 
 
@@ -85,7 +85,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
     public void deregisterSource(final @PathParam("tenant") String tenant,
                                  final @PathParam("namespace") String namespace,
                                  final @PathParam("sourceName") String sourceName) {
-        source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @GET
@@ -115,7 +115,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
             final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) throws IOException {
         return source.getSourceInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+            tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
@@ -133,14 +133,14 @@ public class SourceApiV3Resource extends FunctionApiResource {
     public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sourceName") String sourceName) throws IOException {
-        return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri());
+        return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @GET
     @Path("/{tenant}/{namespace}")
     public List<String> listSources(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace) {
-        return source.listFunctions(tenant, namespace);
+        return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -154,7 +154,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
                               final @PathParam("namespace") String namespace,
                               final @PathParam("sourceName") String sourceName,
                               final @PathParam("instanceId") String instanceId) {
-        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -167,7 +167,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
     public void restartSource(final @PathParam("tenant") String tenant,
                               final @PathParam("namespace") String namespace,
                               final @PathParam("sourceName") String sourceName) {
-        source.restartFunctionInstances(tenant, namespace, sourceName);
+        source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -181,7 +181,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
                            final @PathParam("namespace") String namespace,
                            final @PathParam("sourceName") String sourceName,
                            final @PathParam("instanceId") String instanceId) {
-        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -194,7 +194,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
     public void stopSource(final @PathParam("tenant") String tenant,
                            final @PathParam("namespace") String namespace,
                            final @PathParam("sourceName") String sourceName) {
-        source.stopFunctionInstances(tenant, namespace, sourceName);
+        source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @POST
@@ -208,7 +208,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sourceName") String sourceName,
                             final @PathParam("instanceId") String instanceId) {
-        source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
     @POST
@@ -221,7 +221,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
     public void startSource(final @PathParam("tenant") String tenant,
                             final @PathParam("namespace") String namespace,
                             final @PathParam("sourceName") String sourceName) {
-        source.startFunctionInstances(tenant, namespace, sourceName);
+        source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 660775d..f774180 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -175,7 +175,7 @@ public class FunctionsImplTest {
 
     @Test
     public void testStatusEmpty() {
-        Assert.assertTrue(this.resource.getFunctionInstanceStatus(tenant, namespace, function, "0", null) !=null);
+        Assert.assertTrue(this.resource.getFunctionInstanceStatus(tenant, namespace, function, "0", null, null, null) !=null);
     }
 
     @Test
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index e02b253..871ff5f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -903,7 +903,7 @@ public class FunctionApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(functionConfig),
-                null);
+                null, null);
 
     }
 
@@ -928,7 +928,7 @@ public class FunctionApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(functionConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
@@ -1015,7 +1015,7 @@ public class FunctionApiV3ResourceTest {
             filePackageUrl,
             null,
             new Gson().toJson(functionConfig),
-                null);
+                null, null);
 
     }
 
@@ -1123,7 +1123,7 @@ public class FunctionApiV3ResourceTest {
             tenant,
             namespace,
             function,
-                null);
+                null, null);
     }
 
     private void deregisterDefaultFunction() {
@@ -1131,7 +1131,7 @@ public class FunctionApiV3ResourceTest {
             tenant,
             namespace,
             function,
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
@@ -1250,7 +1250,7 @@ public class FunctionApiV3ResourceTest {
         resource.getFunctionInfo(
             tenant,
             namespace,
-            function
+            function,null,null
         );
 
     }
@@ -1259,7 +1259,9 @@ public class FunctionApiV3ResourceTest {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-            function
+            function,
+                null,
+                null
         );
     }
 
@@ -1341,7 +1343,7 @@ public class FunctionApiV3ResourceTest {
     ) {
         resource.listFunctions(
             tenant,
-            namespace
+            namespace,null,null
         );
 
     }
@@ -1349,7 +1351,7 @@ public class FunctionApiV3ResourceTest {
     private List<String> listDefaultFunctions() {
         return resource.listFunctions(
             tenant,
-            namespace
+            namespace,null,null
         );
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 1b475ed..192fd1a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -797,7 +797,7 @@ public class SinkApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(sinkConfig),
-                null);
+                null, null);
 
     }
 
@@ -837,7 +837,7 @@ public class SinkApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(sinkConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
@@ -938,7 +938,7 @@ public class SinkApiV3ResourceTest {
             filePackageUrl,
             null,
             new Gson().toJson(sinkConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
@@ -1044,7 +1044,7 @@ public class SinkApiV3ResourceTest {
             tenant,
             namespace,
             sink,
-                null);
+                null, null);
 
     }
 
@@ -1053,7 +1053,7 @@ public class SinkApiV3ResourceTest {
             tenant,
             namespace,
                 sink,
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
@@ -1167,7 +1167,7 @@ public class SinkApiV3ResourceTest {
         resource.getFunctionInfo(
             tenant,
             namespace,
-            sink
+            sink, null, null
         );
 
     }
@@ -1265,7 +1265,7 @@ public class SinkApiV3ResourceTest {
     ) {
         resource.listFunctions(
             tenant,
-            namespace
+            namespace, null, null
         );
 
     }
@@ -1273,7 +1273,7 @@ public class SinkApiV3ResourceTest {
     private List<String> listDefaultSinks() {
         return resource.listFunctions(
             tenant,
-            namespace
+            namespace, null, null
         );
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 43f3303..25a0845 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -825,7 +825,7 @@ public class SourceApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(sourceConfig),
-                null);
+                null, null);
 
     }
 
@@ -862,7 +862,7 @@ public class SourceApiV3ResourceTest {
             null,
             null,
             new Gson().toJson(sourceConfig),
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source doesn't exist")
@@ -960,7 +960,7 @@ public class SourceApiV3ResourceTest {
             filePackageUrl,
             null,
             new Gson().toJson(sourceConfig),
-                null);
+                null, null);
 
     }
 
@@ -1067,7 +1067,7 @@ public class SourceApiV3ResourceTest {
             tenant,
             namespace,
             function,
-                null);
+                null, null);
 
     }
 
@@ -1076,7 +1076,7 @@ public class SourceApiV3ResourceTest {
             tenant,
             namespace,
                 source,
-                null);
+                null, null);
     }
 
     @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp= "Source test-source doesn't exist")
@@ -1191,7 +1191,7 @@ public class SourceApiV3ResourceTest {
         resource.getFunctionInfo(
             tenant,
             namespace,
-            source
+            source, null, null
         );
     }
 
@@ -1281,14 +1281,14 @@ public class SourceApiV3ResourceTest {
     ) {
         resource.listFunctions(
             tenant,
-            namespace
+            namespace, null, null
         );
     }
 
     private List<String> listDefaultSources() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace, null, null);
     }
 
     @Test