You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/03/28 16:26:18 UTC
[pulsar] branch master updated: [ISSUE 3763] - Implementing
Function authorization (#3874)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 14d1eaa [ISSUE 3763] - Implementing Function authorization (#3874)
14d1eaa is described below
commit 14d1eaa73e1479e403042da87ad34c7a35a304e2
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Thu Mar 28 11:26:12 2019 -0500
[ISSUE 3763] - Implementing Function authorization (#3874)
* Implementing Function Authorization
---
conf/functions_worker.yml | 13 +-
.../authorization/AuthorizationProvider.java | 10 +
.../broker/authorization/AuthorizationService.java | 6 +-
.../authorization/PulsarAuthorizationProvider.java | 42 ++
.../org/apache/pulsar/PulsarBrokerStarter.java | 6 +
.../java/org/apache/pulsar/PulsarStandalone.java | 7 +
.../org/apache/pulsar/broker/PulsarService.java | 10 +-
.../pulsar/broker/admin/impl/FunctionsBase.java | 32 +-
.../apache/pulsar/broker/admin/impl/SinkBase.java | 22 +-
.../pulsar/broker/admin/impl/SourceBase.java | 22 +-
.../api/AuthorizationProducerConsumerTest.java | 5 +
.../worker/PulsarFunctionE2ESecurityTest.java | 516 +++++++++++++++++++++
.../worker/PulsarWorkerAssignmentTest.java | 4 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 4 +
.../pulsar/common/policies/data/AuthAction.java | 3 +
.../org/apache/pulsar/functions/worker/Worker.java | 68 ++-
.../pulsar/functions/worker/WorkerConfig.java | 22 +
.../pulsar/functions/worker/WorkerService.java | 19 +-
.../functions/worker/rest/api/ComponentImpl.java | 269 +++++++++--
.../functions/worker/rest/api/FunctionsImpl.java | 13 +-
.../functions/worker/rest/api/FunctionsImplV2.java | 24 +-
.../pulsar/functions/worker/rest/api/SinkImpl.java | 13 +-
.../functions/worker/rest/api/SourceImpl.java | 12 +-
.../worker/rest/api/v3/FunctionApiV3Resource.java | 30 +-
.../worker/rest/api/v3/SinkApiV3Resource.java | 22 +-
.../worker/rest/api/v3/SourceApiV3Resource.java | 22 +-
.../worker/rest/api/FunctionsImplTest.java | 2 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 20 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 16 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 16 +-
30 files changed, 1091 insertions(+), 179 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index 0c1125d..776d4cd 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -26,6 +26,13 @@ workerHostname: localhost
workerPort: 6750
workerPortTls: 6751
+# Configuration Store connection string
+configurationStoreServers: localhost:2181
+# ZooKeeper session timeout in milliseconds
+zooKeeperSessionTimeoutMillis: 30000
+# ZooKeeper operation timeout in seconds
+zooKeeperOperationTimeoutSeconds: 30
+
################################
# Function package management
################################
@@ -129,8 +136,10 @@ processContainerFactory:
authenticationEnabled: false
# Enforce authorization on accessing functions api
authorizationEnabled: false
-# Set of autentication provider name list, which is a list of class names
-authenticationProviders:
+# Set of authentication provider name list, which is a list of class names
+authenticationProviders:
+# Authorization provider fully qualified class-name
+authorizationProvider: org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
# Set of role names that are treated as "super-user", meaning they will be able to access any admin-api
superUserRoles:
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
index b25c789..9787eae 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationProvider.java
@@ -96,6 +96,16 @@ public interface AuthorizationProvider extends Closeable {
AuthenticationDataSource authenticationData);
/**
+ * Allow all function operations with in this namespace
+ * @param namespaceName The namespace that the function operations can be executed in
+ * @param role The role to check
+ * @param authenticationData authentication data related to the role
+ * @return a boolean to determine whether authorized or not
+ */
+ CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData);
+
+ /**
*
* Grant authorization-action permission on a namespace to the given client
*
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
index de6f799..95ff764 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/AuthorizationService.java
@@ -32,8 +32,6 @@ import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.function.Function;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -307,4 +305,8 @@ public class AuthorizationService {
return finalResult;
}
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ return provider.allowFunctionOpsAsync(namespaceName, role, authenticationData);
+ }
}
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
index 7914168..8dd04d4 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authorization/PulsarAuthorizationProvider.java
@@ -210,6 +210,48 @@ public class PulsarAuthorizationProvider implements AuthorizationProvider {
}
@Override
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ CompletableFuture<Boolean> permissionFuture = new CompletableFuture<>();
+ try {
+ configCache.policiesCache().getAsync(POLICY_ROOT + namespaceName.toString()).thenAccept(policies -> {
+ if (!policies.isPresent()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Policies node couldn't be found for namespace : {}", namespaceName);
+ }
+ } else {
+ Map<String, Set<AuthAction>> namespaceRoles = policies.get().auth_policies.namespace_auth;
+ Set<AuthAction> namespaceActions = namespaceRoles.get(role);
+ if (namespaceActions != null && namespaceActions.contains(AuthAction.functions)) {
+ // The role has namespace level permission
+ permissionFuture.complete(true);
+ return;
+ }
+
+ // Using wildcard
+ if (conf.isAuthorizationAllowWildcardsMatching()) {
+ if (checkWildcardPermission(role, AuthAction.functions, namespaceRoles)) {
+ // The role has namespace level permission by wildcard match
+ permissionFuture.complete(true);
+ return;
+ }
+ }
+ }
+ permissionFuture.complete(false);
+ }).exceptionally(ex -> {
+ log.warn("Client with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
+ ex.getMessage());
+ permissionFuture.completeExceptionally(ex);
+ return null;
+ });
+ } catch (Exception e) {
+ log.warn("Client with Role - {} failed to get permissions for namespace - {}. {}", role, namespaceName,
+ e.getMessage());
+ permissionFuture.completeExceptionally(e);
+ }
+ return permissionFuture;
+ }
+
+ @Override
public CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions,
String role, String authDataJson) {
return grantPermissionAsync(topicName.getNamespaceObject(), actions, role, authDataJson);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index d9acecc..346a3eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -166,6 +166,12 @@ public class PulsarBrokerStarter {
"c-" + brokerConfig.getClusterName()
+ "-fw-" + hostname
+ "-" + workerConfig.getWorkerPort());
+ // inherit broker authorization setting
+ workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled());
+ workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider());
+ workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers());
+ workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis());
+ workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds());
functionsWorkerService = new WorkerService(workerConfig);
} else {
functionsWorkerService = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
index feabe02..4acbc44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
@@ -297,6 +297,13 @@ public class PulsarStandalone implements AutoCloseable {
"c-" + config.getClusterName()
+ "-fw-" + hostname
+ "-" + workerConfig.getWorkerPort());
+ // inherit broker authorization setting
+ workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
+ workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());
+ workerConfig.setConfigurationStoreServers(config.getConfigurationStoreServers());
+ workerConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
+ workerConfig.setZooKeeperOperationTimeoutSeconds(config.getZooKeeperOperationTimeoutSeconds());
+
fnWorkerService = new WorkerService(workerConfig);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ed4d928..400d207 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -57,6 +57,8 @@ import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -438,7 +440,7 @@ public class PulsarService implements AutoCloseable {
acquireSLANamespace();
// start function worker service if necessary
- this.startWorkerService();
+ this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService());
LOG.info("messaging service is ready, bootstrap service on port={}, broker url={}, cluster={}, configs={}",
config.getWebServicePort().get(), brokerServiceUrl, config.getClusterName(),
@@ -945,7 +947,9 @@ public class PulsarService implements AutoCloseable {
return schemaRegistryService;
}
- private void startWorkerService() throws InterruptedException, IOException, KeeperException {
+ private void startWorkerService(AuthenticationService authenticationService,
+ AuthorizationService authorizationService)
+ throws InterruptedException, IOException, KeeperException {
if (functionWorkerService.isPresent()) {
LOG.info("Starting function worker service");
String namespace = functionWorkerService.get()
@@ -1041,7 +1045,7 @@ public class PulsarService implements AutoCloseable {
throw ioe;
}
LOG.info("Function worker service setup completed");
- functionWorkerService.get().start(dlogURI);
+ functionWorkerService.get().start(dlogURI, authenticationService, authorizationService);
LOG.info("Function worker service started");
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index c4fd33f..e70e5cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -103,7 +103,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, functionConfigJson, clientAppId());
+ functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}
@@ -120,7 +120,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public void deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+ functions.deregisterFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@@ -138,7 +138,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionInfo(tenant, namespace, functionName);
+ return functions.getFunctionInfo(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@@ -158,7 +158,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -177,7 +177,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri());
+ return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -195,7 +195,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+ return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -215,7 +215,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -231,7 +231,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
@Path("/{tenant}/{namespace}")
public List<String> listFunctions(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return functions.listFunctions(tenant, namespace);
+ return functions.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@POST
@@ -253,7 +253,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("data") String triggerValue,
final @FormDataParam("dataStream") InputStream triggerStream,
final @FormDataParam("topic") String topic) {
- return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+ return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, clientAppId(), clientAuthData());
}
@GET
@@ -272,7 +272,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("key") String key) {
- return functions.getFunctionState(tenant, namespace, functionName, key);
+ return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId(), clientAuthData());
}
@POST
@@ -288,7 +288,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -303,7 +303,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public void restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.restartFunctionInstances(tenant, namespace, functionName);
+ functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@@ -319,7 +319,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -334,7 +334,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public void stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.stopFunctionInstances(tenant, namespace, functionName);
+ functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@@ -350,7 +350,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ functions.startFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -365,7 +365,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
public void startFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.startFunctionInstances(tenant, namespace, functionName);
+ functions.startFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index ffd4dd7..e034950 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -100,7 +100,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}
@@ -118,7 +118,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public void deregisterSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+ sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@GET
@@ -157,7 +157,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) throws IOException {
return sink.getSinkInstanceStatus(
- tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -175,7 +175,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
- return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri());
+ return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -191,7 +191,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
@Path("/{tenant}/{namespace}")
public List<String> listSinks(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return sink.listFunctions(tenant, namespace);
+ return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@POST
@@ -207,7 +207,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -222,7 +222,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public void restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.restartFunctionInstances(tenant, namespace, sinkName);
+ sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@POST
@@ -238,7 +238,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -253,7 +253,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public void stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.stopFunctionInstances(tenant, namespace, sinkName);
+ sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@POST
@@ -269,7 +269,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -284,7 +284,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
public void startSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.startFunctionInstances(tenant, namespace, sinkName);
+ sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 6078ede..ef70b67 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -100,7 +100,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @FormDataParam("sourceConfig") String sourceConfigJson) {
source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sourceConfigJson, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}
@@ -117,7 +117,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public void deregisterSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+ source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@GET
@@ -156,7 +156,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) throws IOException {
return source.getSourceInstanceStatus(
- tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -174,7 +174,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
- return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri());
+ return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -190,7 +190,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
@Path("/{tenant}/{namespace}")
public List<String> listSources(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return source.listFunctions(tenant, namespace);
+ return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@POST
@@ -205,7 +205,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -219,7 +219,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public void restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.restartFunctionInstances(tenant, namespace, sourceName);
+ source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@POST
@@ -234,7 +234,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -248,7 +248,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public void stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.stopFunctionInstances(tenant, namespace, sourceName);
+ source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@POST
@@ -263,7 +263,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -277,7 +277,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
public void startSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.startFunctionInstances(tenant, namespace, sourceName);
+ source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
index 72838e8..d8fbc91 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/AuthorizationProducerConsumerTest.java
@@ -440,6 +440,11 @@ public class AuthorizationProducerConsumerTest extends ProducerConsumerBase {
}
@Override
+ public CompletableFuture<Boolean> allowFunctionOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
+ return null;
+ }
+
+ @Override
public CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions,
String role, String authenticationData) {
return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
new file mode 100644
index 0000000..b139457
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -0,0 +1,516 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import io.jsonwebtoken.SignatureAlgorithm;
+import org.apache.bookkeeper.test.PortManager;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.utils.AuthTokenUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
+import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
+import org.apache.pulsar.client.admin.BrokerStats;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.auth.AuthenticationToken;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.Utils;
+import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.SubscriptionStats;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.io.PulsarFunctionE2ETest;
+import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import javax.crypto.SecretKey;
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
+import static org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.fail;
+
+public class PulsarFunctionE2ESecurityTest {
+
+ LocalBookkeeperEnsemble bkEnsemble;
+
+ ServiceConfiguration config;
+ WorkerConfig workerConfig;
+ URL brokerWebServiceUrl;
+ PulsarService pulsar;
+ PulsarAdmin superUserAdmin;
+ PulsarClient pulsarClient;
+ BrokerStats brokerStatsClient;
+ WorkerService functionsWorkerService;
+ final String TENANT = "external-repl-prop";
+ final String NAMESPACE = "test-ns";
+ String pulsarFunctionsNamespace = TENANT + "/use/pulsar-function-admin";
+ String primaryHost;
+ String workerId;
+
+ private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
+ private final int brokerWebServicePort = PortManager.nextFreePort();
+ private final int brokerServicePort = PortManager.nextFreePort();
+ private final int workerServicePort = PortManager.nextFreePort();
+ private SecretKey secretKey;
+
+ private static final String SUBJECT = "my-test-subject";
+ private static final String ADMIN_SUBJECT = "superUser";
+
+ private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
+ private String adminToken;
+ private String brokerServiceUrl;
+
+ @DataProvider(name = "validRoleName")
+ public Object[][] validRoleName() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
+ @BeforeMethod
+ void setup(Method method) throws Exception {
+
+ log.info("--- Setting up method {} ---", method.getName());
+
+ // Start local bookkeeper ensemble
+ bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, () -> PortManager.nextFreePort());
+ bkEnsemble.start();
+
+ brokerServiceUrl = "http://127.0.0.1:" + brokerWebServicePort;
+
+ config = spy(new ServiceConfiguration());
+ config.setClusterName("use");
+ Set<String> superUsers = Sets.newHashSet(ADMIN_SUBJECT);
+ config.setSuperUserRoles(superUsers);
+ config.setWebServicePort(brokerWebServicePort);
+ config.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT);
+ config.setBrokerServicePort(brokerServicePort);
+ config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
+ config.setAdvertisedAddress("localhost");
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ config.setAuthenticationEnabled(true);
+ config.setAuthenticationProviders(providers);
+ config.setAuthorizationEnabled(true);
+ config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+ secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+ Properties properties = new Properties();
+ properties.setProperty("tokenSecretKey",
+ AuthTokenUtils.encodeKeyBase64(secretKey));
+ config.setProperties(properties);
+
+ adminToken = AuthTokenUtils.createToken(secretKey, ADMIN_SUBJECT, Optional.empty());
+
+ config.setBrokerClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ config.setBrokerClientAuthenticationParameters(
+ "token:" + adminToken);
+ functionsWorkerService = createPulsarFunctionWorker(config);
+ brokerWebServiceUrl = new URL(brokerServiceUrl);
+ Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
+ pulsar = new PulsarService(config, functionWorkerService);
+ pulsar.start();
+
+ AuthenticationToken authToken = new AuthenticationToken();
+ authToken.configure("token:" + adminToken);
+
+ superUserAdmin = spy(
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken).build());
+
+ brokerStatsClient = superUserAdmin.brokerStats();
+ primaryHost = String.format("http://%s:%d", "localhost", brokerWebServicePort);
+
+ // update cluster metadata
+ ClusterData clusterData = new ClusterData(brokerWebServiceUrl.toString());
+ superUserAdmin.clusters().updateCluster(config.getClusterName(), clusterData);
+
+ ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(this.workerConfig.getPulsarServiceUrl());
+ if (isNotBlank(workerConfig.getClientAuthenticationPlugin())
+ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
+ clientBuilder.authentication(workerConfig.getClientAuthenticationPlugin(),
+ workerConfig.getClientAuthenticationParameters());
+ }
+ pulsarClient = clientBuilder.build();
+
+ TenantInfo propAdmin = new TenantInfo();
+ propAdmin.getAdminRoles().add(ADMIN_SUBJECT);
+ propAdmin.setAllowedClusters(Sets.newHashSet(Lists.newArrayList("use")));
+ superUserAdmin.tenants().updateTenant(TENANT, propAdmin);
+
+ final String replNamespace = TENANT + "/" + NAMESPACE;
+ superUserAdmin.namespaces().createNamespace(replNamespace);
+ Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
+ superUserAdmin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
+
+ System.setProperty(JAVA_INSTANCE_JAR_PROPERTY, "");
+
+ Thread.sleep(100);
+ }
+
+ @AfterMethod
+ void shutdown() throws Exception {
+ log.info("--- Shutting down ---");
+ pulsarClient.close();
+ superUserAdmin.close();
+ functionsWorkerService.stop();
+ pulsar.close();
+ bkEnsemble.stop();
+ }
+
+ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {
+
+ workerConfig = new WorkerConfig();
+ workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
+ workerConfig.setSchedulerClassName(
+ org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
+ workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+ // worker talks to local broker
+ workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
+ workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
+ workerConfig.setFailureCheckFreqMs(100);
+ workerConfig.setNumFunctionPackageReplicas(1);
+ workerConfig.setClusterCoordinationTopicName("coordinate");
+ workerConfig.setFunctionAssignmentTopicName("assignment");
+ workerConfig.setFunctionMetadataTopicName("metadata");
+ workerConfig.setInstanceLivenessCheckFreqMs(100);
+ workerConfig.setWorkerPort(workerServicePort);
+ workerConfig.setPulsarFunctionsCluster(config.getClusterName());
+ String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+ this.workerId = "c-" + config.getClusterName() + "-fw-" + hostname + "-" + workerConfig.getWorkerPort();
+ workerConfig.setWorkerHostname(hostname);
+ workerConfig.setWorkerId(workerId);
+
+ workerConfig.setClientAuthenticationPlugin(AuthenticationToken.class.getName());
+ workerConfig.setClientAuthenticationParameters(
+ String.format("token:%s", adminToken));
+
+ workerConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
+ workerConfig.setAuthenticationProviders(config.getAuthenticationProviders());
+ workerConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
+ workerConfig.setAuthorizationProvider(config.getAuthorizationProvider());
+
+ return new WorkerService(workerConfig);
+ }
+
+ protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {
+
+ FunctionConfig functionConfig = new FunctionConfig();
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ functionConfig.setParallelism(1);
+ functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+ functionConfig.setSubName(subscriptionName);
+ functionConfig.setInputs(Collections.singleton(sourceTopic));
+ functionConfig.setAutoAck(true);
+ functionConfig.setClassName("org.apache.pulsar.functions.api.examples.ExclamationFunction");
+ functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+ functionConfig.setOutput(sinkTopic);
+ functionConfig.setCleanupSubscription(true);
+ return functionConfig;
+ }
+
+ @Test
+ public void testAuthorization() throws Exception {
+ String token1 = AuthTokenUtils.createToken(secretKey, SUBJECT, Optional.empty());
+ String token2 = AuthTokenUtils.createToken(secretKey, "wrong-subject", Optional.empty());
+
+ final String replNamespace = TENANT + "/" + NAMESPACE;
+ final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
+ final String sinkTopic = "persistent://" + replNamespace + "/output";
+ final String propertyKey = "key";
+ final String propertyValue = "value";
+ final String functionName = "PulsarFunction-test";
+ final String subscriptionName = "test-sub";
+
+
+ // create user admin client
+ AuthenticationToken authToken1 = new AuthenticationToken();
+ authToken1.configure("token:" + token1);
+
+ AuthenticationToken authToken2 = new AuthenticationToken();
+ authToken2.configure("token:" + token2);
+
+ try(PulsarAdmin admin1 = spy(
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken1).build());
+ PulsarAdmin admin2 = spy(
+ PulsarAdmin.builder().serviceHttpUrl(brokerServiceUrl).authentication(authToken2).build())
+ ) {
+
+ String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
+
+ FunctionConfig functionConfig = createFunctionConfig(TENANT, NAMESPACE, functionName,
+ sourceTopic, sinkTopic, subscriptionName);
+
+ // creating function should fail since admin1 doesn't have permissions granted yet
+ try {
+ admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ fail("client admin shouldn't have permissions to create function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ // grant permissions to admin1
+ Set<AuthAction> actions = new HashSet<>();
+ actions.add(AuthAction.functions);
+ actions.add(AuthAction.produce);
+ actions.add(AuthAction.consume);
+ superUserAdmin.namespaces().grantPermissionOnNamespace(replNamespace, SUBJECT, actions);
+
+ // user should be able to create function now
+ admin1.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ // admin2 should still fail
+ try {
+ admin2.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
+ fail("client admin shouldn't have permissions to create function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ retryStrategically((test) -> {
+ try {
+ return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 1
+ && admin1.topics().getStats(sourceTopic).subscriptions.size() == 1;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+ // validate pulsar sink consumer has started on the topic
+ assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 1);
+ assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 1);
+
+ // create a producer that creates a topic at broker
+ try(Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(sinkTopic).subscriptionName("sub").subscribe()) {
+
+ int totalMsgs = 5;
+ for (int i = 0; i < totalMsgs; i++) {
+ String data = "my-message-" + i;
+ producer.newMessage().property(propertyKey, propertyValue).value(data).send();
+ }
+ retryStrategically((test) -> {
+ try {
+ SubscriptionStats subStats = admin1.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
+ return subStats.unackedMessages == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ Message<String> msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedPropertyValue = msg.getProperty(propertyKey);
+ assertEquals(propertyValue, receivedPropertyValue);
+
+
+ // validate pulsar-sink consumer has consumed all messages and delivered to Pulsar sink but unacked
+ // messages
+ // due to publish failure
+ assertNotEquals(admin1.topics().getStats(sourceTopic).subscriptions.values().iterator().next().unackedMessages,
+ totalMsgs);
+ }
+
+ // test update functions
+ functionConfig.setParallelism(2);
+ // admin2 should still fail
+ try {
+ admin2.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+ fail("client admin shouldn't have permissions to update function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ admin1.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
+
+ retryStrategically((test) -> {
+ try {
+ return admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning() == 2;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ assertEquals(admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName).getNumRunning(), 2);
+
+ // test getFunctionInfo
+ try {
+ admin2.functions().getFunction(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to get function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunction(TENANT, NAMESPACE, functionName);
+
+ // test getFunctionInstanceStatus
+ try {
+ admin2.functions().getFunctionStatus(TENANT, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to get function status");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName, 0);
+
+ // test getFunctionStatus
+ try {
+ admin2.functions().getFunctionStatus(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to get function status");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStatus(TENANT, NAMESPACE, functionName);
+
+ // test getFunctionStats
+ try {
+ admin2.functions().getFunctionStats(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to get function stats");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName);
+
+ // test getFunctionInstanceStats
+ try {
+ admin2.functions().getFunctionStats(TENANT, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to get function stats");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctionStats(TENANT, NAMESPACE, functionName, 0);
+
+ // test listFunctions
+ try {
+ admin2.functions().getFunctions(TENANT, NAMESPACE);
+ fail("client admin shouldn't have permissions to list functions");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().getFunctions(TENANT, NAMESPACE);
+
+ // test triggerFunction
+ try {
+ admin2.functions().triggerFunction(TENANT, NAMESPACE, functionName, sourceTopic, "foo", null);
+ fail("client admin shouldn't have permissions to trigger function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().triggerFunction(TENANT, NAMESPACE, functionName, sourceTopic, "foo", null);
+
+ // test restartFunctionInstance
+ try {
+ admin2.functions().restartFunction(TENANT, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to restart function instance");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName, 0);
+
+ // test restartFunctionInstances
+ try {
+ admin2.functions().restartFunction(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+ // test stopFunction instance
+ try {
+ admin2.functions().stopFunction(TENANT, NAMESPACE, functionName, 0);
+ fail("client admin shouldn't have permissions to stop function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().stopFunction(TENANT, NAMESPACE, functionName, 0);
+
+ // test stopFunction all instance
+ try {
+ admin2.functions().stopFunction(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().stopFunction(TENANT, NAMESPACE, functionName);
+
+ // test startFunction instance
+ try {
+ admin2.functions().startFunction(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+ // test startFunction all instances
+ try {
+ admin2.functions().restartFunction(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to restart function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+ admin1.functions().restartFunction(TENANT, NAMESPACE, functionName);
+
+ // delete functions
+ // admin2 should still fail
+ try {
+ admin2.functions().deleteFunction(TENANT, NAMESPACE, functionName);
+ fail("client admin shouldn't have permissions to delete function");
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+
+ }
+
+ admin1.functions().deleteFunction(TENANT, NAMESPACE, functionName);
+
+ retryStrategically((test) -> {
+ try {
+ return admin1.topics().getStats(sourceTopic).subscriptions.size() == 0;
+ } catch (PulsarAdminException e) {
+ return false;
+ }
+ }, 5, 150);
+
+ // make sure subscriptions are cleanup
+ assertEquals(admin1.topics().getStats(sourceTopic).subscriptions.size(), 0);
+ }
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 91a5910..febdadd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -25,12 +25,14 @@ import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -289,7 +291,7 @@ public class PulsarWorkerAssignmentTest {
final URI dlUri = functionsWorkerService.getDlogUri();
functionsWorkerService.stop();
functionsWorkerService = new WorkerService(workerConfig);
- functionsWorkerService.start(dlUri);
+ functionsWorkerService.start(dlUri, new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig)), null);
final FunctionRuntimeManager runtimeManager2 = functionsWorkerService.getFunctionRuntimeManager();
retryStrategically((test) -> {
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 253d2b6..422e9ed 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -26,6 +26,7 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.client.admin.BrokerStats;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -157,6 +158,9 @@ public class PulsarFunctionE2ETest {
config.setAuthenticationEnabled(true);
config.setAuthenticationProviders(providers);
+ config.setAuthorizationEnabled(true);
+ config.setAuthorizationProvider(PulsarAuthorizationProvider.class.getName());
+
config.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
config.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
config.setTlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
index 1fd2301..faef870 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AuthAction.java
@@ -27,4 +27,7 @@ public enum AuthAction {
/** Permission to consume messages */
consume,
+
+ /** Permissions for functions ops **/
+ functions,
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 65dbc40..db84462 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -18,18 +18,30 @@
*/
package org.apache.pulsar.functions.worker;
+import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
+import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
@Slf4j
public class Worker {
@@ -38,6 +50,13 @@ public class Worker {
private final WorkerService workerService;
private WorkerServer server;
+ private ZooKeeperClientFactory zkClientFactory = null;
+ private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build();
+ private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10,
+ new DefaultThreadFactory("zk-cache-callback"));
+ private GlobalZooKeeperCache globalZkCache;
+ private ConfigurationCacheService configurationCacheService;
+
public Worker(WorkerConfig workerConfig) {
this.workerConfig = workerConfig;
this.workerService = new WorkerService(workerConfig);
@@ -46,7 +65,7 @@ public class Worker {
protected void start() throws Exception {
URI dlogUri = initialize(this.workerConfig);
- workerService.start(dlogUri);
+ workerService.start(dlogUri, getAuthenticationService(), getAuthorizationService());
this.server = new WorkerServer(workerService);
this.server.start();
log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort());
@@ -132,15 +151,60 @@ public class Worker {
}
}
+ private AuthorizationService getAuthorizationService() throws PulsarServerException {
+
+ if (this.workerConfig.isAuthorizationEnabled()) {
+
+ log.info("starting configuration cache service");
+
+ this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
+ (int) workerConfig.getZooKeeperSessionTimeoutMillis(),
+ workerConfig.getZooKeeperOperationTimeoutSeconds(),
+ workerConfig.getConfigurationStoreServers(),
+ orderedExecutor, cacheExecutor);
+ try {
+ this.globalZkCache.start();
+ } catch (IOException e) {
+ throw new PulsarServerException(e);
+ }
+
+ this.configurationCacheService = new ConfigurationCacheService(this.globalZkCache, this.workerConfig.getPulsarFunctionsCluster());
+ return new AuthorizationService(PulsarConfigurationLoader.convertFrom(workerConfig), this.configurationCacheService);
+ }
+ return null;
+ }
+
+ private AuthenticationService getAuthenticationService() throws PulsarServerException {
+ return new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
+ }
+
+ public ZooKeeperClientFactory getZooKeeperClientFactory() {
+ if (zkClientFactory == null) {
+ zkClientFactory = new ZookeeperBkClientFactoryImpl(orderedExecutor);
+ }
+ // Return default factory
+ return zkClientFactory;
+ }
+
protected void stop() {
try {
if (null != this.server) {
this.server.stop();
}
workerService.stop();
- }catch(Exception e) {
+ } catch(Exception e) {
log.warn("Failed to gracefully stop worker service ", e);
}
+
+ if (this.globalZkCache != null) {
+ try {
+ this.globalZkCache.close();
+ } catch (IOException e) {
+ log.warn("Failed to close global zk cache ", e);
+ }
+ }
+
+
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index df7f14d..6c42a24 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -33,6 +33,7 @@ import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
import org.apache.pulsar.common.configuration.PulsarConfiguration;
@@ -105,6 +106,22 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private int numHttpServerThreads = 8;
@FieldContext(
+ category = CATEGORY_WORKER,
+ required = false,
+ doc = "Configuration store connection string (as a comma-separated list)"
+ )
+ private String configurationStoreServers;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "ZooKeeper session timeout in milliseconds"
+ )
+ private long zooKeeperSessionTimeoutMillis = 30000;
+ @FieldContext(
+ category = CATEGORY_WORKER,
+ doc = "ZooKeeper operation timeout in seconds"
+ )
+ private int zooKeeperOperationTimeoutSeconds = 30;
+ @FieldContext(
category = CATEGORY_CONNECTORS,
doc = "The path to the location to locate builtin connectors"
)
@@ -274,6 +291,11 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
)
private boolean authorizationEnabled = false;
@FieldContext(
+ category = CATEGORY_WORKER_SECURITY,
+ doc = "Authorization provider fully qualified class-name"
+ )
+ private String authorizationProvider = PulsarAuthorizationProvider.class.getName();
+ @FieldContext(
category = CATEGORY_WORKER_SECURITY,
doc = "Role names that are treated as `super-user`, meaning they will be able to access any admin-api"
)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index d06053b..b035fb8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -24,6 +24,7 @@ import com.google.common.annotations.VisibleForTesting;
import io.netty.util.concurrent.DefaultThreadFactory;
+import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -36,16 +37,24 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationService;
+import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
+import org.apache.pulsar.zookeeper.LocalZooKeeperCache;
+import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
+import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
/**
* A service component contains everything to run a worker except rest server.
@@ -69,6 +78,7 @@ public class WorkerService {
private boolean isInitialized = false;
private final ScheduledExecutorService statsUpdater;
private AuthenticationService authenticationService;
+ private AuthorizationService authorizationService;
private ConnectorsManager connectorsManager;
private PulsarAdmin brokerAdmin;
private PulsarAdmin functionAdmin;
@@ -85,7 +95,10 @@ public class WorkerService {
this.metricsGenerator = new MetricsGenerator(this.statsUpdater, workerConfig);
}
- public void start(URI dlogUri) throws InterruptedException {
+
+ public void start(URI dlogUri,
+ AuthenticationService authenticationService,
+ AuthorizationService authorizationService) throws InterruptedException {
log.info("Starting worker {}...", workerConfig.getWorkerId());
this.brokerAdmin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl(),
@@ -175,7 +188,9 @@ public class WorkerService {
// initialize function runtime manager
this.functionRuntimeManager.initialize();
- authenticationService = new AuthenticationService(PulsarConfigurationLoader.convertFrom(workerConfig));
+ this.authenticationService = authenticationService;
+
+ this.authorizationService = authorizationService;
// Starting cluster services
log.info("Start cluster services...");
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 23f479a..0ed9fd1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.worker.rest.api;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.pulsar.functions.utils.Reflections.createInstance;
@@ -79,6 +80,7 @@ import static org.apache.pulsar.functions.utils.Utils.ComponentType.SOURCE;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -91,6 +93,7 @@ import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -313,6 +316,17 @@ public abstract class ComponentImpl {
}
try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ try {
// Check tenant exists
final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
@@ -334,17 +348,6 @@ public abstract class ComponentImpl {
throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
- try {
- if (!isAuthorizedRole(tenant, clientRole)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
- componentName, clientRole, componentType);
- throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
- }
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
@@ -483,7 +486,8 @@ public abstract class ComponentImpl {
final String functionPkgUrl,
final String functionDetailsJson,
final String componentConfigJson,
- final String clientRole) {
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
@@ -500,7 +504,7 @@ public abstract class ComponentImpl {
}
try {
- if (!isAuthorizedRole(tenant, clientRole)) {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
componentName, clientRole, componentType);
throw new RestException(Status.UNAUTHORIZED, componentType + "client is not authorize to perform operation");
@@ -624,14 +628,15 @@ public abstract class ComponentImpl {
public void deregisterFunction(final String tenant,
final String namespace,
final String componentName,
- final String clientRole) {
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
try {
- if (!isAuthorizedRole(tenant, clientRole)) {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
componentName, clientRole, componentType);
throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
@@ -696,12 +701,25 @@ public abstract class ComponentImpl {
public FunctionConfig getFunctionInfo(final String tenant,
final String namespace,
- final String componentName) {
+ final String componentName,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to get {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -728,16 +746,20 @@ public abstract class ComponentImpl {
final String namespace,
final String componentName,
final String instanceId,
- final URI uri) {
- changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri);
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
+ changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, false, uri, clientRole, clientAuthenticationDataHttps);
}
public void startFunctionInstance(final String tenant,
final String namespace,
final String componentName,
final String instanceId,
- final URI uri) {
- changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri);
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
+ changeFunctionInstanceStatus(tenant, namespace, componentName, instanceId, true, uri, clientRole, clientAuthenticationDataHttps);
}
public void changeFunctionInstanceStatus(final String tenant,
@@ -745,12 +767,25 @@ public abstract class ComponentImpl {
final String componentName,
final String instanceId,
final boolean start,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
@@ -791,11 +826,24 @@ public abstract class ComponentImpl {
final String namespace,
final String componentName,
final String instanceId,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
@@ -830,25 +878,42 @@ public abstract class ComponentImpl {
public void stopFunctionInstances(final String tenant,
final String namespace,
- final String componentName) {
- changeFunctionStatusAllInstances(tenant, namespace, componentName, false);
+ final String componentName,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
+ changeFunctionStatusAllInstances(tenant, namespace, componentName, false, clientRole, clientAuthenticationDataHttps);
}
public void startFunctionInstances(final String tenant,
final String namespace,
- final String componentName) {
- changeFunctionStatusAllInstances(tenant, namespace, componentName, true);
+ final String componentName,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
+ changeFunctionStatusAllInstances(tenant, namespace, componentName, true, clientRole, clientAuthenticationDataHttps);
}
public void changeFunctionStatusAllInstances(final String tenant,
final String namespace,
final String componentName,
- final boolean start) {
+ final boolean start,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to start/stop {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -886,11 +951,24 @@ public abstract class ComponentImpl {
public void restartFunctionInstances(final String tenant,
final String namespace,
- final String componentName) {
+ final String componentName,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to restart {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -925,9 +1003,22 @@ public abstract class ComponentImpl {
public FunctionStats getFunctionStats(final String tenant,
final String namespace,
final String componentName,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+ throwUnavailableException();
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
// validate parameters
@@ -968,9 +1059,22 @@ public abstract class ComponentImpl {
final String namespace,
final String componentName,
final String instanceId,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
- throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
+ throwUnavailableException();
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to get stats for {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
}
// validate parameters
@@ -1014,12 +1118,25 @@ public abstract class ComponentImpl {
return functionInstanceStatsData;
}
- public List<String> listFunctions(final String tenant, final String namespace) {
+ public List<String> listFunctions(final String tenant,
+ final String namespace,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{} Client [{}] is not admin and authorized to list {}", tenant, namespace, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{} Failed to authorize [{}]", tenant, namespace, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateListFunctionRequestParams(tenant, namespace);
@@ -1074,12 +1191,25 @@ public abstract class ComponentImpl {
final String functionName,
final String input,
final InputStream uploadedInputStream,
- final String topic) {
+ final String topic,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to trigger {}", tenant, namespace,
+ functionName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
@@ -1174,12 +1304,25 @@ public abstract class ComponentImpl {
public FunctionState getFunctionState(final String tenant,
final String namespace,
final String functionName,
- final String key) {
+ final String key,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to get state for {}", tenant, namespace,
+ functionName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
if (null == worker().getStateStoreAdminClient()) {
throwStateStoreUnvailableResponse();
}
@@ -1689,21 +1832,33 @@ public abstract class ComponentImpl {
Utils.getUniquePackageName(Codec.encode(fileName)));
}
- public boolean isAuthorizedRole(String tenant, String clientRole) throws PulsarAdminException {
+ public boolean isAuthorizedRole(String tenant, String namespace, String clientRole,
+ AuthenticationDataSource authenticationData) throws PulsarAdminException {
if (worker().getWorkerConfig().isAuthorizationEnabled()) {
// skip authorization if client role is super-user
if (isSuperUser(clientRole)) {
return true;
}
TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
- return clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()
- || tenantInfo.getAdminRoles().contains(clientRole));
+ if (clientRole != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty()
+ || tenantInfo.getAdminRoles().contains(clientRole))) {
+ return true;
+ }
+
+ // check if role has permissions granted
+ if (clientRole != null && authenticationData != null) {
+ return allowFunctionOps(NamespaceName.get(tenant, namespace), clientRole, authenticationData);
+ } else {
+ return false;
+ }
}
return true;
}
public boolean isSuperUser(String clientRole) {
- return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
+ return clientRole != null
+ && worker().getWorkerConfig().getSuperUserRoles() != null
+ && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
}
public ComponentType calculateSubjectType(FunctionMetaData functionMetaData) {
@@ -1725,11 +1880,24 @@ public abstract class ComponentImpl {
return SINK;
}
- protected void componentStatusRequestValidate (final String tenant, final String namespace, final String componentName) {
+ protected void componentStatusRequestValidate (final String tenant, final String namespace, final String componentName,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
if (!isWorkerServiceAvailable()) {
throw new RestException(Status.SERVICE_UNAVAILABLE, "Function worker service is not done initializing. Please try again in a little while.");
}
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized get status for {}", tenant, namespace,
+ componentName, clientRole, componentType);
+ throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
// validate parameters
try {
validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
@@ -1754,8 +1922,10 @@ public abstract class ComponentImpl {
protected void componentInstanceStatusRequestValidate (final String tenant,
final String namespace,
final String componentName,
- final int instanceId) {
- componentStatusRequestValidate(tenant, namespace, componentName);
+ final int instanceId,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
+ componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
@@ -1766,4 +1936,19 @@ public abstract class ComponentImpl {
String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
}
}
+
+ public boolean allowFunctionOps(NamespaceName namespaceName, String role,
+ AuthenticationDataSource authenticationData) {
+ try {
+ return worker().getAuthorizationService().allowFunctionOpsAsync(
+ namespaceName, role, authenticationData).get(worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), SECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Time-out {} sec while checking function authorization on {} ", worker().getWorkerConfig().getZooKeeperOperationTimeoutSeconds(), namespaceName);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ } catch (Exception e) {
+ log.warn("Admin-client with Role - {} failed to get function permissions for namespace - {}. {}", role, namespaceName,
+ e.getMessage(), e);
+ throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 1c5a090..04628d6 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -211,10 +212,12 @@ public class FunctionsImpl extends ComponentImpl {
final String namespace,
final String componentName,
final String instanceId,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
// validate parameters
- componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId));
+ componentInstanceStatusRequestValidate(tenant, namespace, componentName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData;
try {
@@ -241,10 +244,12 @@ public class FunctionsImpl extends ComponentImpl {
public FunctionStatus getFunctionStatus(final String tenant,
final String namespace,
final String componentName,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
// validate parameters
- componentStatusRequestValidate(tenant, namespace, componentName);
+ componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
FunctionStatus functionStatus;
try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index 45fbbf2..49d1156 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -55,7 +55,7 @@ public class FunctionsImplV2 {
throws IOException {
// run just for parameter checks
- delegate.getFunctionInfo(tenant, namespace, functionName);
+ delegate.getFunctionInfo(tenant, namespace, functionName, null, null);
FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
@@ -69,7 +69,7 @@ public class FunctionsImplV2 {
final String instanceId, URI uri) throws IOException {
org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
- functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri);
+ functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri, null, null);
String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
return Response.status(Response.Status.OK).entity(jsonResponse).build();
@@ -77,7 +77,7 @@ public class FunctionsImplV2 {
public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
IOException {
- FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri);
+ FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri, null, null);
InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
toProto(functionInstanceStatus.getStatus(),
@@ -98,29 +98,29 @@ public class FunctionsImplV2 {
FormDataContentDisposition fileDetail, String functionPkgUrl, String
functionDetailsJson, String functionConfigJson, String clientAppId) {
delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+ functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId, null);
return Response.ok().build();
}
public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) {
- delegate.deregisterFunction(tenant, namespace, functionName, clientAppId);
+ delegate.deregisterFunction(tenant, namespace, functionName, clientAppId, null);
return Response.ok().build();
}
public Response listFunctions(String tenant, String namespace) {
- Collection<String> functionStateList = delegate.listFunctions( tenant, namespace);
+ Collection<String> functionStateList = delegate.listFunctions( tenant, namespace, null, null);
return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
}
public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
InputStream triggerStream, String topic) {
- String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+ String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic, null, null);
return Response.status(Response.Status.OK).entity(result).build();
}
public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
FunctionState functionState = delegate.getFunctionState(
- tenant, namespace, functionName, key);
+ tenant, namespace, functionName, key, null, null);
String value;
if (functionState.getNumberValue() != null) {
@@ -135,23 +135,23 @@ public class FunctionsImplV2 {
public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
uri) {
- delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+ delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri, null, null);
return Response.ok().build();
}
public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
- delegate.restartFunctionInstances(tenant, namespace, functionName);
+ delegate.restartFunctionInstances(tenant, namespace, functionName, null, null);
return Response.ok().build();
}
public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
uri) {
- delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+ delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri, null ,null);
return Response.ok().build();
}
public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
- delegate.stopFunctionInstances(tenant, namespace, functionName);
+ delegate.stopFunctionInstances(tenant, namespace, functionName, null, null);
return Response.ok().build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 39828a0..acd924b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
@@ -210,10 +211,12 @@ public class SinkImpl extends ComponentImpl {
final String namespace,
final String sinkName,
final String instanceId,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
// validate parameters
- componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+ componentInstanceStatusRequestValidate(tenant, namespace, sinkName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData;
@@ -232,10 +235,12 @@ public class SinkImpl extends ComponentImpl {
public SinkStatus getSinkStatus(final String tenant,
final String namespace,
final String componentName,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
// validate parameters
- componentStatusRequestValidate(tenant, namespace, componentName);
+ componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
SinkStatus sinkStatus;
try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index ec7ba30..e5ed28e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker.rest.api;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
@@ -211,9 +212,10 @@ public class SourceImpl extends ComponentImpl {
public SourceStatus getSourceStatus(final String tenant,
final String namespace,
final String componentName,
- final URI uri) {
+ final URI uri, final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
// validate parameters
- componentStatusRequestValidate(tenant, namespace, componentName);
+ componentStatusRequestValidate(tenant, namespace, componentName, clientRole, clientAuthenticationDataHttps);
SourceStatus sourceStatus;
try {
@@ -232,9 +234,11 @@ public class SourceImpl extends ComponentImpl {
final String namespace,
final String sourceName,
final String instanceId,
- final URI uri) {
+ final URI uri,
+ final String clientRole,
+ final AuthenticationDataSource clientAuthenticationDataHttps) {
// validate parameters
- componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+ componentInstanceStatusRequestValidate(tenant, namespace, sourceName, Integer.parseInt(instanceId), clientRole, clientAuthenticationDataHttps);
SourceStatus.SourceInstanceStatus.SourceInstanceStatusData sourceInstanceStatusData;
try {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index 0415be2..f31ecb3 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -85,7 +85,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, functionConfigJson, clientAppId());
+ functionPkgUrl, null, functionConfigJson, clientAppId(), clientAuthData());
}
@@ -94,7 +94,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
public void deregisterFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+ functions.deregisterFunction(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@@ -102,7 +102,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- return functions.getFunctionInfo(tenant, namespace, functionName);
+ return functions.getFunctionInfo(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@GET
@@ -123,7 +123,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionInstanceStatus(
- tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -143,7 +143,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStatus(
- tenant, namespace, functionName, uri.getRequestUri());
+ tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -161,7 +161,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+ return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -182,7 +182,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
return functions.getFunctionsInstanceStats(
- tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ tenant, namespace, functionName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -194,7 +194,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") String input,
final @FormDataParam("dataStream") InputStream uploadedInputStream,
final @FormDataParam("topic") String topic) {
- return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
+ return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic, clientAppId(), clientAuthData());
}
@POST
@@ -210,7 +210,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -225,7 +225,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
public void restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.restartFunctionInstances(tenant, namespace, functionName);
+ functions.restartFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@@ -241,7 +241,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -256,7 +256,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
public void stopFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.stopFunctionInstances(tenant, namespace, functionName);
+ functions.stopFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@@ -272,7 +272,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
- functions.startFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ functions.startFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -287,7 +287,7 @@ public class FunctionApiV3Resource extends FunctionApiResource {
public void startFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) {
- functions.startFunctionInstances(tenant, namespace, functionName);
+ functions.startFunctionInstances(tenant, namespace, functionName, clientAppId(), clientAuthData());
}
@POST
@@ -316,6 +316,6 @@ public class FunctionApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("key") String key) throws IOException {
- return functions.getFunctionState(tenant, namespace, functionName, key);
+ return functions.getFunctionState(tenant, namespace, functionName, key, clientAppId(), clientAuthData());
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index e5d257f..462914d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -75,7 +75,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, sinkConfigJson, clientAppId(), clientAuthData());
}
@DELETE
@@ -83,7 +83,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
public void deregisterSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+ sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@GET
@@ -112,7 +112,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) throws IOException {
- return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -130,14 +130,14 @@ public class SinkApiV3Resource extends FunctionApiResource {
public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) throws IOException {
- return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri());
+ return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@Path("/{tenant}/{namespace}")
public List<String> listSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return sink.listFunctions(tenant, namespace);
+ return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@POST
@@ -151,7 +151,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -163,7 +163,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
public void restartSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.restartFunctionInstances(tenant, namespace, sinkName);
+ sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@POST
@@ -177,7 +177,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -190,7 +190,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
public void stopSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.stopFunctionInstances(tenant, namespace, sinkName);
+ sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@POST
@@ -204,7 +204,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName,
final @PathParam("instanceId") String instanceId) {
- sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -217,7 +217,7 @@ public class SinkApiV3Resource extends FunctionApiResource {
public void startSink(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sinkName") String sinkName) {
- sink.startFunctionInstances(tenant, namespace, sinkName);
+ sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index e07bfe0..4ed3505 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -76,7 +76,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @FormDataParam("sourceConfig") String sourceConfigJson) {
source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, null, sourceConfigJson, clientAppId());
+ functionPkgUrl, null, sourceConfigJson, clientAppId(), clientAuthData());
}
@@ -85,7 +85,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
public void deregisterSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+ source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@GET
@@ -115,7 +115,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) throws IOException {
return source.getSourceInstanceStatus(
- tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@@ -133,14 +133,14 @@ public class SourceApiV3Resource extends FunctionApiResource {
public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) throws IOException {
- return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri());
+ return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@GET
@Path("/{tenant}/{namespace}")
public List<String> listSources(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace) {
- return source.listFunctions(tenant, namespace);
+ return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@POST
@@ -154,7 +154,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -167,7 +167,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
public void restartSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.restartFunctionInstances(tenant, namespace, sourceName);
+ source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@POST
@@ -181,7 +181,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -194,7 +194,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
public void stopSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.stopFunctionInstances(tenant, namespace, sourceName);
+ source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@POST
@@ -208,7 +208,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName,
final @PathParam("instanceId") String instanceId) {
- source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ source.startFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri(), clientAppId(), clientAuthData());
}
@POST
@@ -221,7 +221,7 @@ public class SourceApiV3Resource extends FunctionApiResource {
public void startSource(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("sourceName") String sourceName) {
- source.startFunctionInstances(tenant, namespace, sourceName);
+ source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
}
@GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
index 660775d..f774180 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplTest.java
@@ -175,7 +175,7 @@ public class FunctionsImplTest {
@Test
public void testStatusEmpty() {
- Assert.assertTrue(this.resource.getFunctionInstanceStatus(tenant, namespace, function, "0", null) !=null);
+ Assert.assertTrue(this.resource.getFunctionInstanceStatus(tenant, namespace, function, "0", null, null, null) !=null);
}
@Test
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index e02b253..871ff5f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -903,7 +903,7 @@ public class FunctionApiV3ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null);
+ null, null);
}
@@ -928,7 +928,7 @@ public class FunctionApiV3ResourceTest {
null,
null,
new Gson().toJson(functionConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
@@ -1015,7 +1015,7 @@ public class FunctionApiV3ResourceTest {
filePackageUrl,
null,
new Gson().toJson(functionConfig),
- null);
+ null, null);
}
@@ -1123,7 +1123,7 @@ public class FunctionApiV3ResourceTest {
tenant,
namespace,
function,
- null);
+ null, null);
}
private void deregisterDefaultFunction() {
@@ -1131,7 +1131,7 @@ public class FunctionApiV3ResourceTest {
tenant,
namespace,
function,
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
@@ -1250,7 +1250,7 @@ public class FunctionApiV3ResourceTest {
resource.getFunctionInfo(
tenant,
namespace,
- function
+ function,null,null
);
}
@@ -1259,7 +1259,9 @@ public class FunctionApiV3ResourceTest {
return resource.getFunctionInfo(
tenant,
namespace,
- function
+ function,
+ null,
+ null
);
}
@@ -1341,7 +1343,7 @@ public class FunctionApiV3ResourceTest {
) {
resource.listFunctions(
tenant,
- namespace
+ namespace,null,null
);
}
@@ -1349,7 +1351,7 @@ public class FunctionApiV3ResourceTest {
private List<String> listDefaultFunctions() {
return resource.listFunctions(
tenant,
- namespace
+ namespace,null,null
);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 1b475ed..192fd1a 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -797,7 +797,7 @@ public class SinkApiV3ResourceTest {
null,
null,
new Gson().toJson(sinkConfig),
- null);
+ null, null);
}
@@ -837,7 +837,7 @@ public class SinkApiV3ResourceTest {
null,
null,
new Gson().toJson(sinkConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
@@ -938,7 +938,7 @@ public class SinkApiV3ResourceTest {
filePackageUrl,
null,
new Gson().toJson(sinkConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
@@ -1044,7 +1044,7 @@ public class SinkApiV3ResourceTest {
tenant,
namespace,
sink,
- null);
+ null, null);
}
@@ -1053,7 +1053,7 @@ public class SinkApiV3ResourceTest {
tenant,
namespace,
sink,
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
@@ -1167,7 +1167,7 @@ public class SinkApiV3ResourceTest {
resource.getFunctionInfo(
tenant,
namespace,
- sink
+ sink, null, null
);
}
@@ -1265,7 +1265,7 @@ public class SinkApiV3ResourceTest {
) {
resource.listFunctions(
tenant,
- namespace
+ namespace, null, null
);
}
@@ -1273,7 +1273,7 @@ public class SinkApiV3ResourceTest {
private List<String> listDefaultSinks() {
return resource.listFunctions(
tenant,
- namespace
+ namespace, null, null
);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 43f3303..25a0845 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -825,7 +825,7 @@ public class SourceApiV3ResourceTest {
null,
null,
new Gson().toJson(sourceConfig),
- null);
+ null, null);
}
@@ -862,7 +862,7 @@ public class SourceApiV3ResourceTest {
null,
null,
new Gson().toJson(sourceConfig),
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source doesn't exist")
@@ -960,7 +960,7 @@ public class SourceApiV3ResourceTest {
filePackageUrl,
null,
new Gson().toJson(sourceConfig),
- null);
+ null, null);
}
@@ -1067,7 +1067,7 @@ public class SourceApiV3ResourceTest {
tenant,
namespace,
function,
- null);
+ null, null);
}
@@ -1076,7 +1076,7 @@ public class SourceApiV3ResourceTest {
tenant,
namespace,
source,
- null);
+ null, null);
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp= "Source test-source doesn't exist")
@@ -1191,7 +1191,7 @@ public class SourceApiV3ResourceTest {
resource.getFunctionInfo(
tenant,
namespace,
- source
+ source, null, null
);
}
@@ -1281,14 +1281,14 @@ public class SourceApiV3ResourceTest {
) {
resource.listFunctions(
tenant,
- namespace
+ namespace, null, null
);
}
private List<String> listDefaultSources() {
return resource.listFunctions(
tenant,
- namespace);
+ namespace, null, null);
}
@Test