You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/23 19:35:28 UTC

[GitHub] [pulsar] merlimat commented on a diff in pull request #16062: [feat][broker] PIP-145: Notifications for faster topic discovery

merlimat commented on code in PR #16062:
URL: https://github.com/apache/pulsar/pull/16062#discussion_r905309249


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java:
##########
@@ -110,4 +123,34 @@ public CompletableFuture<Void> clearTenantPersistence(String tenant) {
                     }
                 });
     }
+
+    void handleNotification(Notification notification) {
+        if (notification.getPath().startsWith(MANAGED_LEDGER_PATH)
+                && EnumSet.of(NotificationType.Created, NotificationType.Deleted).contains(notification.getType())) {
+            for (Map.Entry<BiConsumer<String, NotificationType>, Pattern> entry :
+                    new HashMap<>(topicListeners).entrySet()) {
+                Matcher matcher = entry.getValue().matcher(notification.getPath());
+                if (matcher.matches()) {
+                    TopicName topicName = TopicName.get(
+                            matcher.group(2), NamespaceName.get(matcher.group(1)), matcher.group(3));
+                    entry.getKey().accept(topicName.toString(), notification.getType());
+                }
+            }
+        }
+    }
+
+    Pattern namespaceNameToTopicNamePattern(NamespaceName namespaceName) {

Review Comment:
   We could have this as part of `NamespaceName` where we can store the compiled pattern.



##########
pulsar-common/src/main/proto/PulsarApi.proto:
##########
@@ -987,6 +1017,11 @@ message BaseCommand {
         TC_CLIENT_CONNECT_REQUEST = 62;
         TC_CLIENT_CONNECT_RESPONSE = 63;
 
+        WATCH_TOPIC_LIST = 64;
+        WATCH_TOPIC_LIST_SUCCESS = 65;
+        WATCH_TOPIC_UPDATE = 66;
+        UNWATCH_TOPIC_LIST = 67;

Review Comment:
   nit: maybe we could rename to `WATCH_TOPIC_CLOSE` to have the same prefix



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);

Review Comment:
   This method will potentially get called from a different thread (if the authz is not already cached), therefore we cannot pass on the `commandWatchTopicList` object because it will get reused when the io-thread is processing the next call. 
   
   We need to extract the fields that we are interested in into local variables.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java:
##########
@@ -2513,6 +2519,53 @@ protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
                 }));
     }
 
+    protected void handleCommandWatchTopicList(CommandWatchTopicList commandWatchTopicList) {
+        final long requestId = commandWatchTopicList.getRequestId();
+        final long watcherId = commandWatchTopicList.getWatcherId();
+        final NamespaceName namespaceName = NamespaceName.get(commandWatchTopicList.getNamespace());
+
+        final Semaphore lookupSemaphore = service.getLookupRequestSemaphore();
+        if (lookupSemaphore.tryAcquire()) {
+            if (invalidOriginalPrincipal(originalPrincipal)) {
+                final String msg = "Valid Proxy Client role should be provided for watchTopicListRequest ";
+                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", remoteAddress, msg,
+                        authRole, originalPrincipal, namespaceName);
+                commandSender.sendErrorResponse(watcherId, ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return;
+            }
+            isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
+                if (isAuthorized) {
+                    topicListService.handleWatchTopicList(commandWatchTopicList, lookupSemaphore);
+                } else {
+                    final String msg = "Proxy Client is not authorized to watchTopicList";
+                    log.warn("[{}] {} with role {} on namespace {}", remoteAddress, msg, getPrincipal(), namespaceName);
+                    commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+                    lookupSemaphore.release();
+                }
+                return null;
+            }).exceptionally(ex -> {
+                logNamespaceNameAuthException(remoteAddress, "watchTopicList", getPrincipal(),
+                        Optional.of(namespaceName), ex);
+                final String msg = "Exception occurred while trying to handle command WatchTopicList";
+                commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, msg);
+                lookupSemaphore.release();
+                return null;
+            });
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Failed WatchTopicList due to too many lookup-requests {}", remoteAddress,
+                        namespaceName);
+            }
+            commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests,
+                    "Failed due to too many pending lookup requests");
+        }
+    }
+
+    protected void handleCommandUnwatchTopicList(CommandUnwatchTopicList commandUnwatchTopicList) {
+        topicListService.handleUnwatchTopicList(commandUnwatchTopicList);

Review Comment:
   Should we check authorization here or it will be not necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org