You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/05 11:26:41 UTC
[pulsar] branch master updated: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 942c5c5f4d8 [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)
942c5c5f4d8 is described below
commit 942c5c5f4d801d03b63631c6bf0a2e1654ef1c56
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu May 5 19:26:31 2022 +0800
[Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)
---
.../apache/pulsar/broker/admin/AdminResource.java | 50 +++-------------
.../broker/admin/impl/PersistentTopicsBase.java | 68 ++++++++++++----------
.../broker/admin/v1/NonPersistentTopics.java | 21 ++++---
.../pulsar/broker/admin/v1/PersistentTopics.java | 15 ++++-
.../broker/admin/v2/NonPersistentTopics.java | 7 ++-
.../pulsar/broker/admin/v2/PersistentTopics.java | 13 ++++-
.../pulsar/broker/admin/PersistentTopicsTest.java | 56 +++++++++++-------
.../org/apache/pulsar/broker/admin/TopicsTest.java | 2 +-
.../pulsar/broker/admin/v1/V1_AdminApi2Test.java | 7 ++-
9 files changed, 127 insertions(+), 112 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ef167829f8d..ec502e134f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -427,21 +427,20 @@ public abstract class AdminResource extends PulsarWebResource {
this.servletContext = servletContext;
}
+ protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+ boolean authoritative,
+ boolean checkAllowAutoCreation) {
+ return sync(() -> getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation));
+ }
+
protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
- try {
- validateClusterOwnership(topicName.getCluster());
- } catch (Exception e) {
- return FutureUtil.failedFuture(e);
- }
-
// validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
// serve/redirect request else fail partitioned-metadata-request so, client fails while creating
// producer/consumer
- return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
- .thenRun(() -> {
- validateTopicOperation(topicName, TopicOperation.LOOKUP);
- })
+ return validateClusterOwnershipAsync(topicName.getCluster())
+ .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP))
.thenCompose(__ -> {
if (checkAllowAutoCreation) {
return pulsar().getBrokerService()
@@ -452,37 +451,6 @@ public abstract class AdminResource extends PulsarWebResource {
});
}
- protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
- boolean authoritative, boolean checkAllowAutoCreation) {
- validateClusterOwnership(topicName.getCluster());
- // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
- // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
- // producer/consumer
- validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
-
- try {
- validateTopicOperation(topicName, TopicOperation.LOOKUP);
- } catch (Exception e) {
- // unknown error marked as internal server error
- log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
- clientAppId(), e.getMessage(), e);
- throw new RestException(e);
- }
-
- PartitionedTopicMetadata partitionMetadata;
- if (checkAllowAutoCreation) {
- partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), topicName);
- } else {
- partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
- }
-
- if (log.isDebugEnabled()) {
- log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), topicName,
- partitionMetadata.partitions);
- }
- return partitionMetadata;
- }
-
protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
try {
return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf1b3d71528..aa3fa6011dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -546,27 +546,37 @@ public class PersistentTopicsBase extends AdminResource {
protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
boolean checkAllowAutoCreation) {
- PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
- authoritative, checkAllowAutoCreation);
- if (metadata.partitions == 0 && !checkAllowAutoCreation) {
- // The topic may be a non-partitioned topic, so check if it exists here.
- // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
- // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
- // shouldn't check if the topic exists.
- try {
- if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
- throw new RestException(Status.NOT_FOUND,
- new PulsarClientException.NotFoundException("Topic not exist"));
- }
- } catch (InterruptedException | ExecutionException e) {
- log.error("Failed to check if topic '{}' exists", topicName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
- }
- }
- if (metadata.partitions > 1) {
- validateClientVersion();
- }
- return metadata;
+ return sync(() -> internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation));
+ }
+
+ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMetadataAsync(
+ boolean authoritative,
+ boolean checkAllowAutoCreation) {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation)
+ .thenCompose(metadata -> {
+ CompletableFuture<Void> ret;
+ if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+ // The topic may be a non-partitioned topic, so check if it exists here.
+ // However, when checkAllowAutoCreation is true, the client will create the topic if
+ // it doesn't exist. In this case, `partitions == 0` means the automatically created topic
+ // is a non-partitioned topic so we shouldn't check if the topic exists.
+ ret = internalCheckTopicExists(topicName);
+ } else if (metadata.partitions > 1) {
+ ret = internalValidateClientVersionAsync();
+ } else {
+ ret = CompletableFuture.completedFuture(null);
+ }
+ return ret.thenApply(__ -> metadata);
+ });
+ }
+
+ protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
+ return pulsar().getNamespaceService().checkTopicExists(topicName)
+ .thenAccept(exist -> {
+ if (!exist) {
+ throw new RestException(Status.NOT_FOUND, "Topic not exist");
+ }
+ });
}
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
@@ -4135,15 +4145,15 @@ public class PersistentTopicsBase extends AdminResource {
// Pulsar client-java lib always passes user-agent as X-Java-$version.
// However, cpp-client older than v1.20 (PR #765) never used to pass it.
// So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be rejected
- private void validateClientVersion() {
+ protected CompletableFuture<Void> internalValidateClientVersionAsync() {
if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
- return;
+ return CompletableFuture.completedFuture(null);
}
final String userAgent = httpRequest.getHeader("User-Agent");
if (StringUtils.isBlank(userAgent)) {
- throw new RestException(Status.METHOD_NOT_ALLOWED,
+ return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Client lib is not compatible to"
- + " access partitioned metadata: version in user-agent is not present");
+ + " access partitioned metadata: version in user-agent is not present"));
}
// Version < 1.20 for cpp-client is not allowed
if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
@@ -4154,18 +4164,16 @@ public class PersistentTopicsBase extends AdminResource {
if (splits != null && splits.length > 1) {
if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0])
|| LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1])) {
- throw new RestException(Status.METHOD_NOT_ALLOWED,
+ return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
"Client lib is not compatible to access partitioned metadata: version " + userAgent
- + " is not supported");
+ + " is not supported"));
}
}
- } catch (RestException re) {
- throw re;
} catch (Exception e) {
log.warn("[{}] Failed to parse version {} ", clientAppId(), userAgent);
}
}
- return;
+ return CompletableFuture.completedFuture(null);
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index dc466be86d6..93e49dc70b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -77,16 +76,16 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission")})
- public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
- @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace,
- @PathParam("topic") @Encoded String encodedTopic,
- @QueryParam("authoritative") @DefaultValue("false")
- boolean authoritative,
- @QueryParam("checkAllowAutoCreation") @DefaultValue("false")
- boolean checkAllowAutoCreation) {
- validateTopicName(property, cluster, namespace, encodedTopic);
- return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
+ public void getPartitionedMetadata(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
+ super.getPartitionedMetadata(asyncResponse, property, cluster, namespace, encodedTopic, authoritative,
+ checkAllowAutoCreation);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e5107a6bd8..bb92498b2e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,7 +48,6 @@ import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -283,13 +282,23 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 403, message = "Don't have admin permission") })
- public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
+ public void getPartitionedMetadata(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(property, cluster, namespace, encodedTopic);
- return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
+ internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get partitioned metadata topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 66867b68426..5cf78278350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -88,7 +87,8 @@ public class NonPersistentTopics extends PersistentTopics {
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate cluster configuration")
})
- public PartitionedTopicMetadata getPartitionedMetadata(
+ public void getPartitionedMetadata(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -99,7 +99,8 @@ public class NonPersistentTopics extends PersistentTopics {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is check configuration required to automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
- return super.getPartitionedMetadata(tenant, namespace, encodedTopic, authoritative, checkAllowAutoCreation);
+ super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative,
+ checkAllowAutoCreation);
}
@GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a70bf7f58ea..4095530340c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -881,7 +881,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
@ApiResponse(code = 500, message = "Internal server error")
})
- public PartitionedTopicMetadata getPartitionedMetadata(
+ public void getPartitionedMetadata(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -893,7 +894,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is check configuration required to automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
- return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
+ internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get partitioned metadata topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 382e90e9321..b1bc4ab5486 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -405,58 +405,74 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero partitions"));
-
+ response = mock(AsyncResponse.class);
final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true, null);
- Assert.assertEquals(persistentTopics
- .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions,
- 0);
-
- Assert.assertEquals(persistentTopics
- .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, true).partitions,
- 0);
+ persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, false);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
+ Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
+ response = mock(AsyncResponse.class);
+ responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, true);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
+ Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
}
@Test
public void testCreateNonPartitionedTopic() {
final String topicName = "standard-topic-partition-a";
+ AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
- PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
+ persistentTopics.getPartitionedMetadata(response,
testTenant, testNamespace, topicName, true, false);
- Assert.assertEquals(pMetadata.partitions, 0);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().partitions, 0);
- PartitionedTopicMetadata metadata = persistentTopics.getPartitionedMetadata(
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(response,
testTenant, testNamespace, topicName, true, true);
- Assert.assertEquals(metadata.partitions, 0);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().partitions, 0);
final String topicName2 = "standard-topic-partition-b";
Map<String, String> topicMetadata = Maps.newHashMap();
topicMetadata.put("key1", "value1");
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName2, true, topicMetadata);
- PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(response,
testTenant, testNamespace, topicName2, true, false);
- Assert.assertNull(pMetadata2.properties);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertNull(responseCaptor.getValue().properties);
}
@Test
public void testCreatePartitionedTopic() {
AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
final String topicName = "standard-partitioned-topic-a";
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
Awaitility.await().untilAsserted(() -> {
- PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
+ persistentTopics.getPartitionedMetadata(response,
testTenant, testNamespace, topicName, true, false);
- Assert.assertNull(pMetadata.properties);
+ verify(response, timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+ Assert.assertNull(responseCaptor.getValue().properties);
});
+ AsyncResponse response2 = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
final String topicName2 = "standard-partitioned-topic-b";
Map<String, String> topicMetadata = Maps.newHashMap();
topicMetadata.put("key1", "value1");
PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
- persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName2, metadata, true);
+ persistentTopics.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true);
Awaitility.await().untilAsserted(() -> {
- PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
+ persistentTopics.getPartitionedMetadata(response2,
testTenant, testNamespace, topicName2, true, false);
- Assert.assertEquals(pMetadata2.properties.size(), 1);
- Assert.assertEquals(pMetadata2.properties, topicMetadata);
+ verify(response2, timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+ Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
+ Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 4edfbfd17db..82d7571912b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -383,7 +383,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, producerMessages);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
- Assert.assertEquals(responseCaptor.getValue().getMessage(), "Fail to publish message: Topic not exist");
+ Assert.assertTrue(responseCaptor.getValue().getMessage().contains("Topic not exist"));
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
index c74d5ee7733..f62130ec381 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
@@ -256,7 +256,12 @@ public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest {
// test partitioned-topic
final String partitionedTopicName = "non-persistent://prop-xyz/use/ns1/paritioned";
- assertEquals(admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0);
+ try {
+ admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName);
+ fail("Should have failed");
+ } catch (Exception ex) {
+ assertTrue(ex instanceof PulsarAdminException.NotFoundException);
+ }
admin.nonPersistentTopics().createPartitionedTopic(partitionedTopicName, 5);
assertEquals(admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5);
}