You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 02:02:29 UTC

[pulsar] branch master updated: Make TopicLookupBase#internalLookupTopicAsync pure async (#14188)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49fcc80600b Make TopicLookupBase#internalLookupTopicAsync pure async (#14188)
49fcc80600b is described below

commit 49fcc80600b35152a588c93c5dc5d9d0297b3a22
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 10:02:23 2022 +0800

    Make TopicLookupBase#internalLookupTopicAsync pure async (#14188)
---
 .../pulsar/broker/lookup/TopicLookupBase.java      | 168 +++++++++------------
 .../pulsar/broker/lookup/v1/TopicLookup.java       |  17 ++-
 .../pulsar/broker/lookup/v2/TopicLookup.java       |  17 ++-
 .../broker/lookup/http/HttpTopicLookupv2Test.java  |  86 +++++++----
 .../broker/lookup/http/v2/TopicLookupTest.java     |   9 +-
 5 files changed, 164 insertions(+), 133 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c8ca671f317..a09c48771cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.lookup;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
 import static org.apache.pulsar.common.protocol.Commands.newLookupResponse;
 import io.netty.buffer.ByteBuf;
@@ -48,6 +47,7 @@ import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.TopicOperation;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,97 +56,81 @@ public class TopicLookupBase extends PulsarWebResource {
     private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
     private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
 
-    protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative,
-                                            AsyncResponse asyncResponse, String listenerName) {
+    protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
+                                                                     String listenerName) {
         if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
             log.warn("No broker was found available for topic {}", topicName);
-            asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
-            return;
+            return FutureUtil.failedFuture(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
         }
-
-        try {
-            validateClusterOwnership(topicName.getCluster());
-            validateAdminAndClientPermission(topicName);
-            validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
-        } catch (WebApplicationException we) {
-            // Validation checks failed
-            log.error("Validation check failed: {}", we.getMessage());
-            completeLookupResponseExceptionally(asyncResponse, we);
-            return;
-        } catch (Throwable t) {
-            // Validation checks failed with unknown error
-            log.error("Validation check failed: {}", t.getMessage(), t);
-            completeLookupResponseExceptionally(asyncResponse, new RestException(t));
-            return;
-        }
-
-        // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists in the broker,
-        // it doesn't have metadata. If the topic is non-persistent and non-partitioned, we'll return the true flag.
-        CompletableFuture<Boolean> existFuture = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)
-                || (!topicName.isPersistent() && !topicName.isPartitioned())
-                ? CompletableFuture.completedFuture(true) : pulsar().getNamespaceService().checkTopicExists(topicName);
-        existFuture.thenAccept(exist -> {
-            if (!exist) {
-                completeLookupResponseExceptionally(asyncResponse, new RestException(Response.Status.NOT_FOUND,
-                        "Topic not found."));
-                return;
-            }
-            CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
-                    .getBrokerServiceUrlAsync(topicName,
-                            LookupOptions.builder().advertisedListenerName(listenerName)
-                                    .authoritative(authoritative).loadTopicsInBundle(false).build());
-
-            lookupFuture.thenAccept(optionalResult -> {
-                if (optionalResult == null || !optionalResult.isPresent()) {
-                    log.warn("No broker was found available for topic {}", topicName);
-                    completeLookupResponseExceptionally(asyncResponse,
-                            new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
-                    return;
-                }
-
-                LookupResult result = optionalResult.get();
-                // We have found either a broker that owns the topic, or a broker to
-                // which we should redirect the client to
-                if (result.isRedirect()) {
-                    boolean newAuthoritative = result.isAuthoritativeRedirect();
-                    URI redirect;
-                    try {
-                        String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
-                                : result.getLookupData().getHttpUrl();
-                        checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
-                        String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
-                        String path = String.format("%s%s%s?authoritative=%s",
-                                redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
-                        path = listenerName == null ? path : path + "&listenerName=" + listenerName;
-                        redirect = new URI(path);
-                    } catch (URISyntaxException | NullPointerException e) {
-                        log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
-                        completeLookupResponseExceptionally(asyncResponse, e);
-                        return;
+        return validateClusterOwnershipAsync(topicName.getCluster())
+                .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null))
+                .thenCompose(__ -> {
+                    // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
+                    // in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
+                    // we'll return the true flag.
+                    CompletableFuture<Boolean> existFuture = pulsar().getBrokerService()
+                            .isAllowAutoTopicCreation(topicName)
+                            || (!topicName.isPersistent() && !topicName.isPartitioned())
+                            ? CompletableFuture.completedFuture(true)
+                            : pulsar().getNamespaceService().checkTopicExists(topicName);
+                    return existFuture;
+                })
+                .thenCompose(exist -> {
+                    if (!exist) {
+                        throw new RestException(Response.Status.NOT_FOUND, "Topic not found.");
                     }
-                    if (log.isDebugEnabled()) {
-                        log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
-                    }
-                    completeLookupResponseExceptionally(asyncResponse,
-                            new WebApplicationException(Response.temporaryRedirect(redirect).build()));
+                    CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
+                            .getBrokerServiceUrlAsync(topicName,
+                                    LookupOptions.builder()
+                                            .advertisedListenerName(listenerName)
+                                            .authoritative(authoritative)
+                                            .loadTopicsInBundle(false)
+                                            .build());
 
-                } else {
-                    // Found broker owning the topic
-                    if (log.isDebugEnabled()) {
-                        log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, result.getLookupData());
-                    }
-                    completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
-                }
-            }).exceptionally(exception -> {
-                log.warn("Failed to lookup broker for topic {}: {}", topicName, exception.getMessage(), exception);
-                completeLookupResponseExceptionally(asyncResponse, exception);
-                return null;
-            });
-        }).exceptionally(e -> {
-            log.warn("Failed to check exist for topic {}: {} when lookup", topicName, e.getMessage(), e);
-            completeLookupResponseExceptionally(asyncResponse, e);
-            return null;
-        });
+                    return lookupFuture.thenApply(optionalResult -> {
+                        if (optionalResult == null || !optionalResult.isPresent()) {
+                            log.warn("No broker was found available for topic {}", topicName);
+                            throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
+                        }
+
+                        LookupResult result = optionalResult.get();
+                        // We have found either a broker that owns the topic, or a broker to
+                        // which we should redirect the client to
+                        if (result.isRedirect()) {
+                            boolean newAuthoritative = result.isAuthoritativeRedirect();
+                            URI redirect;
+                            try {
+                                String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
+                                        : result.getLookupData().getHttpUrl();
+                                if (redirectUrl == null) {
+                                    log.error("Redirected cluster's service url is not configured");
+                                    throw new RestException(Response.Status.PRECONDITION_FAILED,
+                                            "Redirected cluster's service url is not configured.");
+                                }
+                                String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
+                                String path = String.format("%s%s%s?authoritative=%s",
+                                        redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
+                                path = listenerName == null ? path : path + "&listenerName=" + listenerName;
+                                redirect = new URI(path);
+                            } catch (URISyntaxException e) {
+                                log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
+                                throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
+                            }
+                            if (log.isDebugEnabled()) {
+                                log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
+                            }
+                            throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+                        } else {
+                            // Found broker owning the topic
+                            if (log.isDebugEnabled()) {
+                                log.debug("Lookup succeeded for topic {} -- broker: {}", topicName,
+                                        result.getLookupData());
+                            }
+                            return result.getLookupData();
+                        }
+                    });
+                });
     }
 
     private void validateAdminAndClientPermission(TopicName topic) throws RestException, Exception {
@@ -347,14 +331,10 @@ public class TopicLookupBase extends PulsarWebResource {
         return lookupfuture;
     }
 
-    private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
-        pulsar().getBrokerService().getLookupRequestSemaphore().release();
-        asyncResponse.resume(t);
-    }
-
-    private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
+    protected void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
         pulsar().getBrokerService().getLookupRequestSemaphore().release();
-        asyncResponse.resume(lookupData);
+        Throwable cause = FutureUtil.unwrapCompletionException(t);
+        asyncResponse.resume(cause);
     }
 
     protected TopicName getTopicName(String topicDomain, String tenant, String cluster, String namespace,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
index a2a2659a0fe..b1015d9e063 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
@@ -31,6 +31,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.lookup.TopicLookupBase;
 import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
@@ -48,6 +49,7 @@ import org.apache.pulsar.common.naming.TopicName;
  */
 @Path("/v2/destination/")
 @NoSwaggerDocumentation
+@Slf4j
 public class TopicLookup extends TopicLookupBase {
 
     static final String LISTENERNAME_HEADER = "X-Pulsar-ListenerName";
@@ -57,18 +59,27 @@ public class TopicLookup extends TopicLookupBase {
     @Produces(MediaType.APPLICATION_JSON)
     @ApiResponses(value = { @ApiResponse(code = 307,
             message = "Current broker doesn't serve the namespace of this topic") })
-    public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("property") String property,
+    public void lookupTopicAsync(
+            @Suspended AsyncResponse asyncResponse,
+            @PathParam("topic-domain") String topicDomain, @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @Suspended AsyncResponse asyncResponse,
             @QueryParam("listenerName") String listenerName,
             @HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
         TopicName topicName = getTopicName(topicDomain, property, cluster, namespace, encodedTopic);
         if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
             listenerName = listenerNameHeader;
         }
-        internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
+        internalLookupTopicAsync(topicName, authoritative, listenerName)
+                .thenAccept(lookupData -> asyncResponse.resume(lookupData))
+                .exceptionally(ex -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Failed to check exist for topic {} when lookup", topicName, ex);
+                    }
+                    completeLookupResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
index 0ac74cf782b..b343a92a42c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
@@ -31,11 +31,13 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.lookup.TopicLookupBase;
 import org.apache.pulsar.common.naming.TopicName;
 
 @Path("/v2/topic")
+@Slf4j
 public class TopicLookup extends TopicLookupBase {
 
     static final String LISTENERNAME_HEADER = "X-Pulsar-ListenerName";
@@ -45,17 +47,26 @@ public class TopicLookup extends TopicLookupBase {
     @Produces(MediaType.APPLICATION_JSON)
     @ApiResponses(value = { @ApiResponse(code = 307,
             message = "Current broker doesn't serve the namespace of this topic") })
-    public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
+    public void lookupTopicAsync(
+            @Suspended AsyncResponse asyncResponse,
+            @PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
-            @Suspended AsyncResponse asyncResponse,
             @QueryParam("listenerName") String listenerName,
             @HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
         TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
         if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
             listenerName = listenerNameHeader;
         }
-        internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
+        internalLookupTopicAsync(topicName, authoritative, listenerName)
+                .thenAccept(lookupData -> asyncResponse.resume(lookupData))
+                .exceptionally(ex -> {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Failed to check exist for topic {} when lookup", topicName, ex);
+                    }
+                    completeLookupResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 5ae60762748..bb20559c061 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -45,10 +45,12 @@ import org.apache.pulsar.broker.lookup.RedirectData;
 import org.apache.pulsar.broker.lookup.v1.TopicLookup;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.resources.ClusterResources;
+import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
@@ -68,6 +70,7 @@ public class HttpTopicLookupv2Test {
     private AuthorizationService auth;
     private ServiceConfiguration config;
     private Set<String> clusters;
+    private NamespaceResources namespaceResources;
 
     @SuppressWarnings("unchecked")
     @BeforeMethod
@@ -91,8 +94,10 @@ public class HttpTopicLookupv2Test {
         when(clusters.getClusterAsync("usc")).thenReturn(CompletableFuture.completedFuture(Optional.of(uscData)));
         when(clusters.getClusterAsync("usw")).thenReturn(CompletableFuture.completedFuture(Optional.of(uswData)));
         PulsarResources resources = mock(PulsarResources.class);
+        namespaceResources = mock(NamespaceResources.class);
         when(resources.getClusterResources()).thenReturn(clusters);
         when(pulsar.getPulsarResources()).thenReturn(resources);
+        when(resources.getNamespaceResources()).thenReturn(namespaceResources);
 
         doReturn(ns).when(pulsar).getNamespaceService();
         BrokerService brokerService = mock(BrokerService.class);
@@ -117,8 +122,7 @@ public class HttpTopicLookupv2Test {
         doReturn(true).when(config).isAuthorizationEnabled();
 
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
-        destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
-                asyncResponse, null, null);
+        destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false, null, null);
 
         ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
         verify(asyncResponse).resume(arg.capture());
@@ -148,8 +152,7 @@ public class HttpTopicLookupv2Test {
         doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
 
         AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
-        destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false,
-                asyncResponse1, null, null);
+        destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false, null, null);
 
         ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
         verify(asyncResponse1).resume(arg.capture());
@@ -163,6 +166,10 @@ public class HttpTopicLookupv2Test {
         protected void validateClusterOwnership(String s) {
             // do nothing
         }
+        @Override
+        protected CompletableFuture<Void> validateClusterOwnershipAsync(String s) {
+            return CompletableFuture.completedFuture(null);
+        }
     }
     
     @Test
@@ -184,8 +191,7 @@ public class HttpTopicLookupv2Test {
         doReturn(true).when(config).isAuthorizationEnabled();
 
         AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
-        destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
-                asyncResponse1, null, null);
+        destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,null, null);
 
         ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
         verify(asyncResponse1).resume(arg.capture());
@@ -201,13 +207,8 @@ public class HttpTopicLookupv2Test {
         final String cluster = "global";
         final String ns1 = "ns1";
         final String ns2 = "ns2";
-        Policies policies1 = new Policies();
-//        doReturn(Optional.of(policies1)).when(policiesCache)
-//                .get(AdminResource.path(POLICIES, property, cluster, ns1));
-        Policies policies2 = new Policies();
-        policies2.replication_clusters = Sets.newHashSet("invalid-localCluster");
-//        doReturn(Optional.of(policies2)).when(policiesCache)
-//                .get(AdminResource.path(POLICIES, property, cluster, ns2));
+        NamespaceName namespaceName1 = NamespaceName.get(property + "/" + cluster + "/" + ns1);
+        NamespaceName namespaceName2 = NamespaceName.get(property + "/" + cluster + "/" + ns2);
 
         TopicLookup destLookup = spy(TopicLookup.class);
         doReturn(false).when(destLookup).isRequestHttps();
@@ -218,23 +219,52 @@ public class HttpTopicLookupv2Test {
         UriInfo uriInfo = mock(UriInfo.class);
         uriField.set(destLookup, uriInfo);
         doReturn(false).when(config).isAuthorizationEnabled();
-
         AsyncResponse asyncResponse = mock(AsyncResponse.class);
-        destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster",
-                false, asyncResponse, null, null);
-
-        ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
+        ArgumentCaptor<RestException> arg = ArgumentCaptor.forClass(RestException.class);
+//        // Test policy not found
+        CompletableFuture<Optional<Policies>> nullPolicies = new CompletableFuture<>();
+        nullPolicies.complete(Optional.empty());
+        doReturn(nullPolicies).when(namespaceResources).getPoliciesAsync(namespaceName1);
+        destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster", false, null, null);
         verify(asyncResponse).resume(arg.capture());
-        assertEquals(arg.getValue().getClass(), RestException.class);
-
-        AsyncResponse asyncResponse2 = mock(AsyncResponse.class);
-        destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns2,
-                "invalid-localCluster", false, asyncResponse2, null, null);
-        ArgumentCaptor<Throwable> arg2 = ArgumentCaptor.forClass(Throwable.class);
-        verify(asyncResponse2).resume(arg2.capture());
-
-        // Should have raised exception for invalid cluster
-        assertEquals(arg2.getValue().getClass(), RestException.class);
+        assertEquals(arg.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
+        // Test empty cluster
+        CompletableFuture<Optional<Policies>> emptyPolicies = new CompletableFuture<>();
+        emptyPolicies.complete(Optional.of(new Policies()));
+        doReturn(emptyPolicies).when(namespaceResources).getPoliciesAsync(namespaceName1);
+        asyncResponse = mock(AsyncResponse.class);
+        arg = ArgumentCaptor.forClass(RestException.class);
+        destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster", false, null, null);
+        verify(asyncResponse).resume(arg.capture());
+        assertEquals(arg.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+        // Test get peer replication cluster
+        asyncResponse = mock(AsyncResponse.class);
+        arg = ArgumentCaptor.forClass(RestException.class);
+        CompletableFuture<Optional<Policies>> policies2Future = new CompletableFuture<>();
+        Policies policies2 = new Policies();
+        policies2.replication_clusters = Sets.newHashSet("invalid-localCluster");
+        policies2Future.complete(Optional.of(policies2));
+        doReturn(policies2Future).when(namespaceResources).getPoliciesAsync(namespaceName2);
+        destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2,
+                "invalid-localCluster", false, null, null);
+        verify(asyncResponse).resume(arg.capture());
+        assertEquals(arg.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+        // Test bypass replication cluster
+        asyncResponse = mock(AsyncResponse.class);
+        arg = ArgumentCaptor.forClass(RestException.class);
+        CompletableFuture<Optional<Policies>> policies3Future = new CompletableFuture<>();
+        Policies policies3 = new Policies();
+        policies3.replication_clusters = Sets.newHashSet("invalid-localCluster", "use");
+        policies3Future.complete(Optional.of(policies3));
+        doReturn(policies3Future).when(namespaceResources).getPoliciesAsync(namespaceName2);
+        NamespaceService namespaceService = pulsar.getNamespaceService();
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        future.complete(false);
+        doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+        destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2,
+                "invalid-localCluster", false, null, null);
+        verify(asyncResponse).resume(arg.capture());
+        assertEquals(arg.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java
index d47c603fba4..01240078033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.broker.lookup.http.v2;
 
-import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.lookup.v2.TopicLookup;
 import org.apache.pulsar.broker.web.PulsarWebResourceTest;
@@ -26,7 +25,7 @@ import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.TopicName;
 import org.glassfish.jersey.server.ResourceConfig;
 import org.testng.annotations.Test;
-
+import java.util.concurrent.CompletableFuture;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
 
@@ -70,10 +69,10 @@ public class TopicLookupTest extends PulsarWebResourceTest {
         private String actualListenerName;
 
         @Override
-        protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse,
-                String listenerName) {
+        protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
+                                                                         String listenerName) {
             this.actualListenerName = listenerName;
-            asyncResponse.resume(new LookupData());
+            return CompletableFuture.completedFuture(new LookupData());
         }
     }
 }