You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/12 10:52:27 UTC

[GitHub] [pulsar] leizhiyuan commented on a diff in pull request #15557: [improve][broker] Avoid using synchronous operations in the async method to block metadata threads.

leizhiyuan commented on code in PR #15557:
URL: https://github.com/apache/pulsar/pull/15557#discussion_r871237553


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java:
##########
@@ -192,143 +192,102 @@ public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarSe
      * d. If current-broker receives request to own the namespace-bundle then
      *    it owns a bundle and returns success(connect)
      *    response to client.
-     *
-     * @param pulsarService
-     * @param topicName
-     * @param authoritative
-     * @param clientAppId
-     * @param requestId
-     * @param advertisedListenerName
-     * @return
      */
     public static CompletableFuture<ByteBuf> lookupTopicAsync(PulsarService pulsarService, TopicName topicName,
                                                               boolean authoritative, String clientAppId,
                                                               AuthenticationDataSource authenticationData,
                                                               long requestId, final String advertisedListenerName) {
-
-        final CompletableFuture<ByteBuf> validationFuture = new CompletableFuture<>();
-        final CompletableFuture<ByteBuf> lookupfuture = new CompletableFuture<>();
         final String cluster = topicName.getCluster();
-
         // (1) validate cluster
-        getClusterDataIfDifferentCluster(pulsarService, cluster, clientAppId).thenAccept(differentClusterData -> {
-
-            if (differentClusterData != null) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", clientAppId,
-                            differentClusterData.getBrokerServiceUrl(), differentClusterData.getBrokerServiceUrlTls(),
-                            cluster);
+        return getClusterDataIfDifferentCluster(pulsarService, cluster, clientAppId)
+            .thenCompose(differentClusterData -> {
+                if (differentClusterData != null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Redirecting the lookup call to {}/{} cluster={}", clientAppId,
+                                differentClusterData.getBrokerServiceUrl(),
+                                differentClusterData.getBrokerServiceUrlTls(), cluster);
+                    }
+                    return CompletableFuture.completedFuture(Optional.of(
+                            newLookupResponse(differentClusterData.getBrokerServiceUrl(),
+                                    differentClusterData.getBrokerServiceUrlTls(), true,
+                                    LookupType.Redirect, requestId, false)));
                 }
-                validationFuture.complete(newLookupResponse(differentClusterData.getBrokerServiceUrl(),
-                        differentClusterData.getBrokerServiceUrlTls(), true, LookupType.Redirect, requestId, false));
-            } else {
                 // (2) authorize client
-                try {
-                    checkAuthorization(pulsarService, topicName, clientAppId, authenticationData);
-                } catch (RestException authException) {
-                    log.warn("Failed to authorized {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.complete(newLookupErrorResponse(ServerError.AuthorizationError,
-                            authException.getMessage(), requestId));
-                    return;
-                } catch (Exception e) {
-                    log.warn("Unknown error while authorizing {} on cluster {}", clientAppId, topicName.toString());
-                    validationFuture.completeExceptionally(e);
-                    return;
+                return validateTopicOperationAsync(pulsarService, topicName,
+                        clientAppId, TopicOperation.LOOKUP, authenticationData)
+                        // (3) validate global namespace
+                        .thenCompose(__ ->
+                            checkLocalOrGetPeerReplicationCluster(pulsarService, topicName.getNamespaceObject())
+                                .thenApply(peerClusterData -> {
+                                    if (peerClusterData != null) {
+                                        // if peer-cluster-data is present it means namespace is owned by
+                                        // that peer-cluster and request should be redirect to the peer-cluster
+                                        if (StringUtils.isBlank(peerClusterData.getBrokerServiceUrl())
+                                                && StringUtils.isBlank(peerClusterData.getBrokerServiceUrlTls())) {
+                                            return Optional.of(newLookupErrorResponse(ServerError.MetadataError,
+                                                        "Redirected cluster's brokerService url is not configured",
+                                                            requestId));
+                                        }
+                                        return Optional.of(newLookupResponse(peerClusterData.getBrokerServiceUrl(),
+                                                peerClusterData.getBrokerServiceUrlTls(), true,
+                                                        LookupType.Redirect, requestId, false));
+                                    }
+                                    // (4) all validation passed: initiate lookup
+                                    return Optional.empty();
+                                }).exceptionally(ex ->
+                                        Optional.of(newLookupErrorResponse(ServerError.MetadataError,

Review Comment:
   need a return statement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org