You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/28 02:02:29 UTC
[pulsar] branch master updated: Make TopicLookupBase#internalLookupTopicAsync pure async (#14188)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 49fcc80600b Make TopicLookupBase#internalLookupTopicAsync pure async (#14188)
49fcc80600b is described below
commit 49fcc80600b35152a588c93c5dc5d9d0297b3a22
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 28 10:02:23 2022 +0800
Make TopicLookupBase#internalLookupTopicAsync pure async (#14188)
---
.../pulsar/broker/lookup/TopicLookupBase.java | 168 +++++++++------------
.../pulsar/broker/lookup/v1/TopicLookup.java | 17 ++-
.../pulsar/broker/lookup/v2/TopicLookup.java | 17 ++-
.../broker/lookup/http/HttpTopicLookupv2Test.java | 86 +++++++----
.../broker/lookup/http/v2/TopicLookupTest.java | 9 +-
5 files changed, 164 insertions(+), 133 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
index c8ca671f317..a09c48771cb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.lookup;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.protocol.Commands.newLookupResponse;
import io.netty.buffer.ByteBuf;
@@ -48,6 +47,7 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,97 +56,81 @@ public class TopicLookupBase extends PulsarWebResource {
private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";
- protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative,
- AsyncResponse asyncResponse, String listenerName) {
+ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
+ String listenerName) {
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topicName);
- asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
- return;
+ return FutureUtil.failedFuture(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
}
-
- try {
- validateClusterOwnership(topicName.getCluster());
- validateAdminAndClientPermission(topicName);
- validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
- } catch (WebApplicationException we) {
- // Validation checks failed
- log.error("Validation check failed: {}", we.getMessage());
- completeLookupResponseExceptionally(asyncResponse, we);
- return;
- } catch (Throwable t) {
- // Validation checks failed with unknown error
- log.error("Validation check failed: {}", t.getMessage(), t);
- completeLookupResponseExceptionally(asyncResponse, new RestException(t));
- return;
- }
-
- // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists in the broker,
- // it doesn't have metadata. If the topic is non-persistent and non-partitioned, we'll return the true flag.
- CompletableFuture<Boolean> existFuture = pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)
- || (!topicName.isPersistent() && !topicName.isPartitioned())
- ? CompletableFuture.completedFuture(true) : pulsar().getNamespaceService().checkTopicExists(topicName);
- existFuture.thenAccept(exist -> {
- if (!exist) {
- completeLookupResponseExceptionally(asyncResponse, new RestException(Response.Status.NOT_FOUND,
- "Topic not found."));
- return;
- }
- CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
- .getBrokerServiceUrlAsync(topicName,
- LookupOptions.builder().advertisedListenerName(listenerName)
- .authoritative(authoritative).loadTopicsInBundle(false).build());
-
- lookupFuture.thenAccept(optionalResult -> {
- if (optionalResult == null || !optionalResult.isPresent()) {
- log.warn("No broker was found available for topic {}", topicName);
- completeLookupResponseExceptionally(asyncResponse,
- new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
- return;
- }
-
- LookupResult result = optionalResult.get();
- // We have found either a broker that owns the topic, or a broker to
- // which we should redirect the client to
- if (result.isRedirect()) {
- boolean newAuthoritative = result.isAuthoritativeRedirect();
- URI redirect;
- try {
- String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
- : result.getLookupData().getHttpUrl();
- checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
- String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
- String path = String.format("%s%s%s?authoritative=%s",
- redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
- path = listenerName == null ? path : path + "&listenerName=" + listenerName;
- redirect = new URI(path);
- } catch (URISyntaxException | NullPointerException e) {
- log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
- completeLookupResponseExceptionally(asyncResponse, e);
- return;
+ return validateClusterOwnershipAsync(topicName.getCluster())
+ .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null))
+ .thenCompose(__ -> {
+ // Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
+ // in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
+ // we'll return the true flag.
+ CompletableFuture<Boolean> existFuture = pulsar().getBrokerService()
+ .isAllowAutoTopicCreation(topicName)
+ || (!topicName.isPersistent() && !topicName.isPartitioned())
+ ? CompletableFuture.completedFuture(true)
+ : pulsar().getNamespaceService().checkTopicExists(topicName);
+ return existFuture;
+ })
+ .thenCompose(exist -> {
+ if (!exist) {
+ throw new RestException(Response.Status.NOT_FOUND, "Topic not found.");
}
- if (log.isDebugEnabled()) {
- log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
- }
- completeLookupResponseExceptionally(asyncResponse,
- new WebApplicationException(Response.temporaryRedirect(redirect).build()));
+ CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
+ .getBrokerServiceUrlAsync(topicName,
+ LookupOptions.builder()
+ .advertisedListenerName(listenerName)
+ .authoritative(authoritative)
+ .loadTopicsInBundle(false)
+ .build());
- } else {
- // Found broker owning the topic
- if (log.isDebugEnabled()) {
- log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, result.getLookupData());
- }
- completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
- }
- }).exceptionally(exception -> {
- log.warn("Failed to lookup broker for topic {}: {}", topicName, exception.getMessage(), exception);
- completeLookupResponseExceptionally(asyncResponse, exception);
- return null;
- });
- }).exceptionally(e -> {
- log.warn("Failed to check exist for topic {}: {} when lookup", topicName, e.getMessage(), e);
- completeLookupResponseExceptionally(asyncResponse, e);
- return null;
- });
+ return lookupFuture.thenApply(optionalResult -> {
+ if (optionalResult == null || !optionalResult.isPresent()) {
+ log.warn("No broker was found available for topic {}", topicName);
+ throw new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE);
+ }
+
+ LookupResult result = optionalResult.get();
+ // We have found either a broker that owns the topic, or a broker to
+ // which we should redirect the client to
+ if (result.isRedirect()) {
+ boolean newAuthoritative = result.isAuthoritativeRedirect();
+ URI redirect;
+ try {
+ String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
+ : result.getLookupData().getHttpUrl();
+ if (redirectUrl == null) {
+ log.error("Redirected cluster's service url is not configured");
+ throw new RestException(Response.Status.PRECONDITION_FAILED,
+ "Redirected cluster's service url is not configured.");
+ }
+ String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
+ String path = String.format("%s%s%s?authoritative=%s",
+ redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
+ path = listenerName == null ? path : path + "&listenerName=" + listenerName;
+ redirect = new URI(path);
+ } catch (URISyntaxException e) {
+ log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
+ throw new RestException(Response.Status.PRECONDITION_FAILED, e.getMessage());
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
+ }
+ throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
+ } else {
+ // Found broker owning the topic
+ if (log.isDebugEnabled()) {
+ log.debug("Lookup succeeded for topic {} -- broker: {}", topicName,
+ result.getLookupData());
+ }
+ return result.getLookupData();
+ }
+ });
+ });
}
private void validateAdminAndClientPermission(TopicName topic) throws RestException, Exception {
@@ -347,14 +331,10 @@ public class TopicLookupBase extends PulsarWebResource {
return lookupfuture;
}
- private void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
- pulsar().getBrokerService().getLookupRequestSemaphore().release();
- asyncResponse.resume(t);
- }
-
- private void completeLookupResponseSuccessfully(AsyncResponse asyncResponse, LookupData lookupData) {
+ protected void completeLookupResponseExceptionally(AsyncResponse asyncResponse, Throwable t) {
pulsar().getBrokerService().getLookupRequestSemaphore().release();
- asyncResponse.resume(lookupData);
+ Throwable cause = FutureUtil.unwrapCompletionException(t);
+ asyncResponse.resume(cause);
}
protected TopicName getTopicName(String topicDomain, String tenant, String cluster, String namespace,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
index a2a2659a0fe..b1015d9e063 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v1/TopicLookup.java
@@ -31,6 +31,7 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.web.NoSwaggerDocumentation;
@@ -48,6 +49,7 @@ import org.apache.pulsar.common.naming.TopicName;
*/
@Path("/v2/destination/")
@NoSwaggerDocumentation
+@Slf4j
public class TopicLookup extends TopicLookupBase {
static final String LISTENERNAME_HEADER = "X-Pulsar-ListenerName";
@@ -57,18 +59,27 @@ public class TopicLookup extends TopicLookupBase {
@Produces(MediaType.APPLICATION_JSON)
@ApiResponses(value = { @ApiResponse(code = 307,
message = "Current broker doesn't serve the namespace of this topic") })
- public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("property") String property,
+ public void lookupTopicAsync(
+ @Suspended AsyncResponse asyncResponse,
+ @PathParam("topic-domain") String topicDomain, @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended AsyncResponse asyncResponse,
@QueryParam("listenerName") String listenerName,
@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
TopicName topicName = getTopicName(topicDomain, property, cluster, namespace, encodedTopic);
if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
listenerName = listenerNameHeader;
}
- internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
+ internalLookupTopicAsync(topicName, authoritative, listenerName)
+ .thenAccept(lookupData -> asyncResponse.resume(lookupData))
+ .exceptionally(ex -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to check exist for topic {} when lookup", topicName, ex);
+ }
+ completeLookupResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
index 0ac74cf782b..b343a92a42c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/v2/TopicLookup.java
@@ -31,11 +31,13 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.common.naming.TopicName;
@Path("/v2/topic")
+@Slf4j
public class TopicLookup extends TopicLookupBase {
static final String LISTENERNAME_HEADER = "X-Pulsar-ListenerName";
@@ -45,17 +47,26 @@ public class TopicLookup extends TopicLookupBase {
@Produces(MediaType.APPLICATION_JSON)
@ApiResponses(value = { @ApiResponse(code = 307,
message = "Current broker doesn't serve the namespace of this topic") })
- public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
+ public void lookupTopicAsync(
+ @Suspended AsyncResponse asyncResponse,
+ @PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
- @Suspended AsyncResponse asyncResponse,
@QueryParam("listenerName") String listenerName,
@HeaderParam(LISTENERNAME_HEADER) String listenerNameHeader) {
TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
if (StringUtils.isEmpty(listenerName) && StringUtils.isNotEmpty(listenerNameHeader)) {
listenerName = listenerNameHeader;
}
- internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
+ internalLookupTopicAsync(topicName, authoritative, listenerName)
+ .thenAccept(lookupData -> asyncResponse.resume(lookupData))
+ .exceptionally(ex -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to check exist for topic {} when lookup", topicName, ex);
+ }
+ completeLookupResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
index 5ae60762748..bb20559c061 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/HttpTopicLookupv2Test.java
@@ -45,10 +45,12 @@ import org.apache.pulsar.broker.lookup.RedirectData;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.resources.ClusterResources;
+import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
@@ -68,6 +70,7 @@ public class HttpTopicLookupv2Test {
private AuthorizationService auth;
private ServiceConfiguration config;
private Set<String> clusters;
+ private NamespaceResources namespaceResources;
@SuppressWarnings("unchecked")
@BeforeMethod
@@ -91,8 +94,10 @@ public class HttpTopicLookupv2Test {
when(clusters.getClusterAsync("usc")).thenReturn(CompletableFuture.completedFuture(Optional.of(uscData)));
when(clusters.getClusterAsync("usw")).thenReturn(CompletableFuture.completedFuture(Optional.of(uswData)));
PulsarResources resources = mock(PulsarResources.class);
+ namespaceResources = mock(NamespaceResources.class);
when(resources.getClusterResources()).thenReturn(clusters);
when(pulsar.getPulsarResources()).thenReturn(resources);
+ when(resources.getNamespaceResources()).thenReturn(namespaceResources);
doReturn(ns).when(pulsar).getNamespaceService();
BrokerService brokerService = mock(BrokerService.class);
@@ -117,8 +122,7 @@ public class HttpTopicLookupv2Test {
doReturn(true).when(config).isAuthorizationEnabled();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
- destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
- asyncResponse, null, null);
+ destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false, null, null);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse).resume(arg.capture());
@@ -148,8 +152,7 @@ public class HttpTopicLookupv2Test {
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
- destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false,
- asyncResponse1, null, null);
+ destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false, null, null);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
@@ -163,6 +166,10 @@ public class HttpTopicLookupv2Test {
protected void validateClusterOwnership(String s) {
// do nothing
}
+ @Override
+ protected CompletableFuture<Void> validateClusterOwnershipAsync(String s) {
+ return CompletableFuture.completedFuture(null);
+ }
}
@Test
@@ -184,8 +191,7 @@ public class HttpTopicLookupv2Test {
doReturn(true).when(config).isAuthorizationEnabled();
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
- destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
- asyncResponse1, null, null);
+ destLookup.lookupTopicAsync(asyncResponse1, TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,null, null);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
@@ -201,13 +207,8 @@ public class HttpTopicLookupv2Test {
final String cluster = "global";
final String ns1 = "ns1";
final String ns2 = "ns2";
- Policies policies1 = new Policies();
-// doReturn(Optional.of(policies1)).when(policiesCache)
-// .get(AdminResource.path(POLICIES, property, cluster, ns1));
- Policies policies2 = new Policies();
- policies2.replication_clusters = Sets.newHashSet("invalid-localCluster");
-// doReturn(Optional.of(policies2)).when(policiesCache)
-// .get(AdminResource.path(POLICIES, property, cluster, ns2));
+ NamespaceName namespaceName1 = NamespaceName.get(property + "/" + cluster + "/" + ns1);
+ NamespaceName namespaceName2 = NamespaceName.get(property + "/" + cluster + "/" + ns2);
TopicLookup destLookup = spy(TopicLookup.class);
doReturn(false).when(destLookup).isRequestHttps();
@@ -218,23 +219,52 @@ public class HttpTopicLookupv2Test {
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
doReturn(false).when(config).isAuthorizationEnabled();
-
AsyncResponse asyncResponse = mock(AsyncResponse.class);
- destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster",
- false, asyncResponse, null, null);
-
- ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
+ ArgumentCaptor<RestException> arg = ArgumentCaptor.forClass(RestException.class);
+// // Test policy not found
+ CompletableFuture<Optional<Policies>> nullPolicies = new CompletableFuture<>();
+ nullPolicies.complete(Optional.empty());
+ doReturn(nullPolicies).when(namespaceResources).getPoliciesAsync(namespaceName1);
+ destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster", false, null, null);
verify(asyncResponse).resume(arg.capture());
- assertEquals(arg.getValue().getClass(), RestException.class);
-
- AsyncResponse asyncResponse2 = mock(AsyncResponse.class);
- destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns2,
- "invalid-localCluster", false, asyncResponse2, null, null);
- ArgumentCaptor<Throwable> arg2 = ArgumentCaptor.forClass(Throwable.class);
- verify(asyncResponse2).resume(arg2.capture());
-
- // Should have raised exception for invalid cluster
- assertEquals(arg2.getValue().getClass(), RestException.class);
+ assertEquals(arg.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
+ // Test empty cluster
+ CompletableFuture<Optional<Policies>> emptyPolicies = new CompletableFuture<>();
+ emptyPolicies.complete(Optional.of(new Policies()));
+ doReturn(emptyPolicies).when(namespaceResources).getPoliciesAsync(namespaceName1);
+ asyncResponse = mock(AsyncResponse.class);
+ arg = ArgumentCaptor.forClass(RestException.class);
+ destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster", false, null, null);
+ verify(asyncResponse).resume(arg.capture());
+ assertEquals(arg.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+ // Test get peer replication cluster
+ asyncResponse = mock(AsyncResponse.class);
+ arg = ArgumentCaptor.forClass(RestException.class);
+ CompletableFuture<Optional<Policies>> policies2Future = new CompletableFuture<>();
+ Policies policies2 = new Policies();
+ policies2.replication_clusters = Sets.newHashSet("invalid-localCluster");
+ policies2Future.complete(Optional.of(policies2));
+ doReturn(policies2Future).when(namespaceResources).getPoliciesAsync(namespaceName2);
+ destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2,
+ "invalid-localCluster", false, null, null);
+ verify(asyncResponse).resume(arg.capture());
+ assertEquals(arg.getValue().getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
+ // Test bypass replication cluster
+ asyncResponse = mock(AsyncResponse.class);
+ arg = ArgumentCaptor.forClass(RestException.class);
+ CompletableFuture<Optional<Policies>> policies3Future = new CompletableFuture<>();
+ Policies policies3 = new Policies();
+ policies3.replication_clusters = Sets.newHashSet("invalid-localCluster", "use");
+ policies3Future.complete(Optional.of(policies3));
+ doReturn(policies3Future).when(namespaceResources).getPoliciesAsync(namespaceName2);
+ NamespaceService namespaceService = pulsar.getNamespaceService();
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ future.complete(false);
+ doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));
+ destLookup.lookupTopicAsync(asyncResponse, TopicDomain.persistent.value(), property, cluster, ns2,
+ "invalid-localCluster", false, null, null);
+ verify(asyncResponse).resume(arg.capture());
+ assertEquals(arg.getValue().getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java
index d47c603fba4..01240078033 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/http/v2/TopicLookupTest.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.broker.lookup.http.v2;
-import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.lookup.v2.TopicLookup;
import org.apache.pulsar.broker.web.PulsarWebResourceTest;
@@ -26,7 +25,7 @@ import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.TopicName;
import org.glassfish.jersey.server.ResourceConfig;
import org.testng.annotations.Test;
-
+import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
@@ -70,10 +69,10 @@ public class TopicLookupTest extends PulsarWebResourceTest {
private String actualListenerName;
@Override
- protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse,
- String listenerName) {
+ protected CompletableFuture<LookupData> internalLookupTopicAsync(TopicName topicName, boolean authoritative,
+ String listenerName) {
this.actualListenerName = listenerName;
- asyncResponse.resume(new LookupData());
+ return CompletableFuture.completedFuture(new LookupData());
}
}
}