You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/07/20 11:31:36 UTC

[GitHub] [pulsar] aloyszhang opened a new pull request #7605: Support partitioned topic lookup

aloyszhang opened a new pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605


   
   Fixes #7571 
   
   ### Motivation
   
   Support look up for partitioned topic.
   
   ### Modifications
   
   Add look up  support for partitioned topic.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661614076


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-703518764


   @eolivelli Made some changes to add `Async` method, PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499247953



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       Personally I would add lookupPartitionedTopicAsync
   
   If the user wants to block it can do it.
   If we provide only the blocking method it is not possible to turn it into non-blocking

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       Personally I would add lookupPartitionedTopicAsync
   
   If the user wants to block it can do it.
   If we provide only the blocking method it is not possible to turn it into non-blocking




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499309485



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       Thanks for your suggenstions @eolivelli .  
   We don't need to worry about method blocking since every blocking method is configed with a timeout.  It will be  blocked for no longer than `readerTimeoutMs`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661777020


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499309485



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       Thanks for your suggenstions @eolivelli .  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499232154



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */
+    Map<String, String> lookupPartitionedTopic(String topic) throws PulsarAdminException;

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499309485



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       Thanks for your suggenstions @eolivelli .  
   We don't need to worries about the method blocking since every blocking method is configed with a timeout.  It will be  blocked for no longer than `readerTimeoutMs`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499479678



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
##########
@@ -26,6 +26,7 @@
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.List;

Review comment:
       is this line needed ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-710819648


   @jiazhai @sijie PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499220083



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       This method should return a CompletableFuture as the method above

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */
+    Map<String, String> lookupPartitionedTopic(String topic) throws PulsarAdminException;

Review comment:
       This method should return a CompletableFuture as the method above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-703236615


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r458133770



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
##########
@@ -127,6 +131,113 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
         });
     }
 
+    protected void internalLookupPartitionedTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse) {
+        if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
+            log.warn("No broker was found available for topic {}", topicName);
+            asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+            return;
+        }
+
+        try {
+            validateClusterOwnership(topicName.getCluster());
+            checkConnect(topicName);
+            validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+        } catch (WebApplicationException we) {
+            // Validation checks failed
+            log.error("Validation check failed: {}", we.getMessage());
+            completeLookupResponseExceptionally(asyncResponse, we);
+            return;
+        } catch (Throwable t) {
+            // Validation checks failed with unknown error
+            log.error("Validation check failed: {}", t.getMessage(), t);
+            completeLookupResponseExceptionally(asyncResponse, new RestException(t));
+            return;
+        }
+
+        pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).whenComplete(
+                (metadata, t) -> {
+                    if (t != null) {
+                        log.error(" Can't find partitioned metadata for {}", topicName);
+                        completeLookupResponseExceptionally(asyncResponse, new RestException(t));
+                        return ;
+                    }
+
+                    String domain = topicName.getDomain().value();
+                    NamespaceName namespace = topicName.getNamespaceObject();
+                    String topicLocalName = topicName.getLocalName();
+                    LookupOptions lookupOptions =  LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build();
+
+                    List<CompletableFuture<Optional<LookupResult>>> futureList = new ArrayList<>();
+                    if (metadata != null && metadata.partitions > 1) {
+                        for (int i = 0 ; i < metadata.partitions; i ++) {
+                            TopicName partitionedTopicName = TopicName.get(domain, namespace, topicLocalName + "-partition-" + i);
+                            futureList.add(pulsar().getNamespaceService().getBrokerServiceUrlAsync(partitionedTopicName, lookupOptions));
+                        }
+                    } else {
+                        futureList.add(pulsar().getNamespaceService().getBrokerServiceUrlAsync(topicName, lookupOptions));
+                    }
+
+                    FutureUtil.waitForAll(futureList).whenComplete(
+                            (ignore, te) -> {
+                                if (te != null) {
+                                    log.warn("Failed to lookup broker for topic {}: {}", topicName, te.getMessage(), te);
+                                    completeLookupResponseExceptionally(asyncResponse, te);
+                                    return;
+                                }
+                                List<LookupData> lookupDataList = new ArrayList<>();
+                                for (CompletableFuture<Optional<LookupResult>> partitionFuture : futureList) {
+                                    partitionFuture.thenAccept(optionalResult -> {
+                                        if (optionalResult == null || !optionalResult.isPresent()) {
+                                            log.warn("No broker was found available for topic {}", topicName);
+                                            completeLookupResponseExceptionally(asyncResponse,
+                                                    new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+                                            return;
+                                        }
+
+                                        LookupResult result = optionalResult.get();
+                                        // We have found either a broker that owns the topic, or a broker to which we should redirect the client to
+                                        if (result.isRedirect()) {
+                                            boolean newAuthoritative = result.isAuthoritativeRedirect();
+                                            URI redirect;
+                                            try {
+                                                String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
+                                                        : result.getLookupData().getHttpUrl();
+                                                checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
+                                                String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
+                                                redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, lookupPath,
+                                                        topicName.getLookupName(), newAuthoritative));
+                                            } catch (URISyntaxException | NullPointerException e) {
+                                                log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
+                                                completeLookupResponseExceptionally(asyncResponse, e);
+                                                return;
+                                            }
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
+                                            }
+                                            completeLookupResponseExceptionally(asyncResponse,
+                                                    new WebApplicationException(Response.temporaryRedirect(redirect).build()));

Review comment:
       If redirect the partitioned topic lookup requests, the target broker will handle the partitioned topic lookup request right? So this may lead to redirect back again, the request can't complete.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai merged pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
jiazhai merged pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499309485



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       Thanks for your suggenstions @eolivelli .  
    I have already add a `async` method 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499232022



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
        If a method is `async`, it should return a  CompletableFuture. 
   But `Lookup#llookupPartitionedTopic` is not a `async` method. It just invokes `Lookup#lookupTopicAsync` internal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661614076






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-703346317


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-703567864


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-709679586


   @codelipenghui PTAL


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499232154



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */
+    Map<String, String> lookupPartitionedTopic(String topic) throws PulsarAdminException;

Review comment:
       ditto




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661578535


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661536069


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-703346317


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661578535


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499236382



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       I meant that we should provide an async method.
   Why not?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r459918848



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
##########
@@ -127,6 +131,113 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
         });
     }
 
+    protected void internalLookupPartitionedTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse) {
+        if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
+            log.warn("No broker was found available for topic {}", topicName);
+            asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+            return;
+        }
+
+        try {
+            validateClusterOwnership(topicName.getCluster());
+            checkConnect(topicName);
+            validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
+        } catch (WebApplicationException we) {
+            // Validation checks failed
+            log.error("Validation check failed: {}", we.getMessage());
+            completeLookupResponseExceptionally(asyncResponse, we);
+            return;
+        } catch (Throwable t) {
+            // Validation checks failed with unknown error
+            log.error("Validation check failed: {}", t.getMessage(), t);
+            completeLookupResponseExceptionally(asyncResponse, new RestException(t));
+            return;
+        }
+
+        pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).whenComplete(
+                (metadata, t) -> {
+                    if (t != null) {
+                        log.error(" Can't find partitioned metadata for {}", topicName);
+                        completeLookupResponseExceptionally(asyncResponse, new RestException(t));
+                        return ;
+                    }
+
+                    String domain = topicName.getDomain().value();
+                    NamespaceName namespace = topicName.getNamespaceObject();
+                    String topicLocalName = topicName.getLocalName();
+                    LookupOptions lookupOptions =  LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build();
+
+                    List<CompletableFuture<Optional<LookupResult>>> futureList = new ArrayList<>();
+                    if (metadata != null && metadata.partitions > 1) {
+                        for (int i = 0 ; i < metadata.partitions; i ++) {
+                            TopicName partitionedTopicName = TopicName.get(domain, namespace, topicLocalName + "-partition-" + i);
+                            futureList.add(pulsar().getNamespaceService().getBrokerServiceUrlAsync(partitionedTopicName, lookupOptions));
+                        }
+                    } else {
+                        futureList.add(pulsar().getNamespaceService().getBrokerServiceUrlAsync(topicName, lookupOptions));
+                    }
+
+                    FutureUtil.waitForAll(futureList).whenComplete(
+                            (ignore, te) -> {
+                                if (te != null) {
+                                    log.warn("Failed to lookup broker for topic {}: {}", topicName, te.getMessage(), te);
+                                    completeLookupResponseExceptionally(asyncResponse, te);
+                                    return;
+                                }
+                                List<LookupData> lookupDataList = new ArrayList<>();
+                                for (CompletableFuture<Optional<LookupResult>> partitionFuture : futureList) {
+                                    partitionFuture.thenAccept(optionalResult -> {
+                                        if (optionalResult == null || !optionalResult.isPresent()) {
+                                            log.warn("No broker was found available for topic {}", topicName);
+                                            completeLookupResponseExceptionally(asyncResponse,
+                                                    new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
+                                            return;
+                                        }
+
+                                        LookupResult result = optionalResult.get();
+                                        // We have found either a broker that owns the topic, or a broker to which we should redirect the client to
+                                        if (result.isRedirect()) {
+                                            boolean newAuthoritative = result.isAuthoritativeRedirect();
+                                            URI redirect;
+                                            try {
+                                                String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
+                                                        : result.getLookupData().getHttpUrl();
+                                                checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
+                                                String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
+                                                redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, lookupPath,
+                                                        topicName.getLookupName(), newAuthoritative));
+                                            } catch (URISyntaxException | NullPointerException e) {
+                                                log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
+                                                completeLookupResponseExceptionally(asyncResponse, e);
+                                                return;
+                                            }
+                                            if (log.isDebugEnabled()) {
+                                                log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
+                                            }
+                                            completeLookupResponseExceptionally(asyncResponse,
+                                                    new WebApplicationException(Response.temporaryRedirect(redirect).build()));

Review comment:
       I'll check this later




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661503272






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661777020


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499483557



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
##########
@@ -26,6 +26,7 @@
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.List;

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661600200


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-703236615


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661503272


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499232022



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       This is not a `async` method. It invokes Lookup#lookupTopicAsync internal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#issuecomment-661608570


   @sijie @codelipenghui  Two CI failed all the times. Errors seems has nothing to do with this PR


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #7605: Support partitioned topic lookup

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #7605:
URL: https://github.com/apache/pulsar/pull/7605#discussion_r499239138



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Lookup.java
##########
@@ -41,6 +42,14 @@
      */
     CompletableFuture<String> lookupTopicAsync(String topic);
 
+    /**
+     * Lookup a partitioned topic.
+     *
+     * @param topic
+     * @return the broker URL that serves the topic
+     */

Review comment:
       For the uniform of method name, we should add a `aync` method for `lookupPartitionedTopic`. 
   But other method without `async`  juset  invoke the relative `async` method. 
   e.g.
   `getBundleRange` just invoke ` getBundleRangeAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS)`
   But, `lookupPartitionedTopic` internal invokes `topics.getPartitionedTopicMetadataAsync(topic).get(this.readTimeoutMs, TimeUnit.MILLISECONDS)` and 
   ` lookupTopicAsync(partitionTopicName).get(readTimeoutMs, TimeUnit.MILLISECONDS)`. 
   So it's hard to  tell which is better.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org