You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/09 01:52:01 UTC
[pulsar] branch master updated: [Broker] Make PersistentTopicsBase#internalCreateNonPartitionedTopic async. (#14030)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4f53e8eedc8 [Broker] Make PersistentTopicsBase#internalCreateNonPartitionedTopic async. (#14030)
4f53e8eedc8 is described below
commit 4f53e8eedc8aca406f60ab4c1f7bc11577a6bf20
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 9 09:51:56 2022 +0800
[Broker] Make PersistentTopicsBase#internalCreateNonPartitionedTopic async. (#14030)
---
.../broker/admin/impl/PersistentTopicsBase.java | 108 +++++++--------
.../pulsar/broker/admin/v1/PersistentTopics.java | 12 +-
.../pulsar/broker/admin/v2/PersistentTopics.java | 11 +-
.../apache/pulsar/broker/admin/AdminApiTest.java | 6 +-
.../apache/pulsar/broker/admin/NamespacesTest.java | 1 -
.../pulsar/broker/admin/PersistentTopicsTest.java | 145 ++++++++++++---------
.../pulsar/broker/admin/TopicPoliciesTest.java | 1 -
.../BrokerServiceAutoTopicCreationTest.java | 1 +
8 files changed, 162 insertions(+), 123 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 033aa41cb52..c2a89721412 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -388,37 +388,31 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
- validateNonPartitionTopicName(topicName.getLocalName());
+ protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean authoritative,
+ Map<String, String> properties) {
+ CompletableFuture<Void> ret = validateNonPartitionTopicNameAsync(topicName.getLocalName());
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- validateTopicOwnership(topicName, authoritative);
- validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC);
-
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- log.warn("[{}] Partitioned topic with the same name already exists {}", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "This topic already exists");
- }
-
- try {
- Optional<Topic> existedTopic = pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get();
- if (existedTopic.isPresent()) {
- log.error("[{}] Topic {} already exists", clientAppId(), topicName);
- throw new RestException(Status.CONFLICT, "This topic already exists");
- }
-
- Topic createdTopic = getOrCreateTopic(topicName, properties);
- log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
- } catch (Exception e) {
- if (e instanceof RestException) {
- throw (RestException) e;
- } else {
- log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
- throw new RestException(e);
- }
- }
+ ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
+ }
+ return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+ NamespaceOperation.CREATE_TOPIC))
+ .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, false, false))
+ .thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ log.warn("[{}] Partitioned topic with the same name already exists {}", clientAppId(), topicName);
+ throw new RestException(Status.CONFLICT, "This topic already exists");
+ }
+ })
+ .thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(topicName.toString()))
+ .thenCompose(existedTopic -> {
+ if (existedTopic.isPresent()) {
+ log.error("[{}] Topic {} already exists", clientAppId(), topicName);
+ throw new RestException(Status.CONFLICT, "This topic already exists");
+ }
+ return pulsar().getBrokerService().getTopic(topicName.toString(), true, properties);
+ })
+ .thenAccept(__ -> log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName));
}
/**
@@ -3995,11 +3989,6 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- private Topic getOrCreateTopic(TopicName topicName, Map<String, String> properties) {
- return pulsar().getBrokerService().getTopic(topicName.toString(), true, properties)
- .thenApply(Optional::get).join();
- }
-
/**
* Get the Subscription object reference from the Topic reference.
*/
@@ -4236,7 +4225,8 @@ public class PersistentTopicsBase extends AdminResource {
*
* @param topicName
*/
- private void validateNonPartitionTopicName(String topicName) {
+ private CompletableFuture<Void> validateNonPartitionTopicNameAsync(String topicName) {
+ CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
try {
// First check if what's after suffix "-partition-" is number or not, if not number then can create.
@@ -4245,34 +4235,36 @@ public class PersistentTopicsBase extends AdminResource {
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
TopicName partitionTopicName = TopicName.get(domain(),
namespaceName, topicName.substring(0, partitionIndex));
- PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
-
- // Partition topic index is 0 to (number of partition - 1)
- if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
- log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
- + " a number smaller then number of partition of partitioned topic {}.",
- clientAppId(), topicName, partitionTopicName.getLocalName());
- throw new RestException(Status.PRECONDITION_FAILED,
- "Can't create topic " + topicName + " with \"-partition-\" followed by"
- + " a number smaller then number of partition of partitioned topic "
- + partitionTopicName.getLocalName());
- } else if (metadata.partitions == 0) {
- log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
- + " numeric value if there isn't a partitioned topic {} created.",
- clientAppId(), topicName, partitionTopicName.getLocalName());
- throw new RestException(Status.PRECONDITION_FAILED,
- "Can't create topic " + topicName + " with \"-partition-\" followed by"
- + " numeric value if there isn't a partitioned topic "
- + partitionTopicName.getLocalName() + " created.");
- }
- // If there is a partitioned topic with the same name and numeric suffix is smaller than the
- // number of partition for that partitioned topic, validation will pass.
+ ret = getPartitionedTopicMetadataAsync(partitionTopicName, false, false)
+ .thenAccept(metadata -> {
+ // Partition topic index is 0 to (number of partition - 1)
+ if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
+ log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+ + " a number smaller then number of partition of partitioned topic {}.",
+ clientAppId(), topicName, partitionTopicName.getLocalName());
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Can't create topic " + topicName + " with \"-partition-\" followed by"
+ + " a number smaller then number of partition of partitioned topic "
+ + partitionTopicName.getLocalName());
+ } else if (metadata.partitions == 0) {
+ log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+ + " numeric value if there isn't a partitioned topic {} created.",
+ clientAppId(), topicName, partitionTopicName.getLocalName());
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Can't create topic " + topicName + " with \"-partition-\" followed by"
+ + " numeric value if there isn't a partitioned topic "
+ + partitionTopicName.getLocalName() + " created.");
+ }
+ // If there is a partitioned topic with the same name and numeric suffix is smaller
+ // than the number of partition for that partitioned topic, validation will pass.
+ });
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict if user want to create partitioned topic with same
// topic name prefix in the future.
}
}
+ return ret;
}
protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index bb92498b2e7..a7ae878013b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -215,6 +215,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the cluster", required = true)
@@ -228,7 +229,16 @@ public class PersistentTopics extends PersistentTopicsBase {
validateNamespaceName(tenant, cluster, namespace);
validateTopicName(tenant, cluster, namespace, encodedTopic);
validateGlobalNamespaceOwnership();
- internalCreateNonPartitionedTopic(authoritative, null);
+ validateCreateTopic(topicName);
+ internalCreateNonPartitionedTopicAsync(authoritative, null)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
/**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 953b0f5f821..3f92eadacdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -334,6 +334,7 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
})
public void createNonPartitionedTopic(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -348,7 +349,15 @@ public class PersistentTopics extends PersistentTopicsBase {
validateGlobalNamespaceOwnership();
validateTopicName(tenant, namespace, encodedTopic);
validateCreateTopic(topicName);
- internalCreateNonPartitionedTopic(authoritative, properties);
+ internalCreateNonPartitionedTopicAsync(authoritative, properties)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 64de71ac488..0579eb20c48 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2709,10 +2709,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
@Test
public void testNamespaceNotExist() {
- final String nonPartitionedtopic = "persistent://prop-xyz/no-exist/non-partitioned-topic";
+ final String nonPartitionedTopic = "persistent://prop-xyz/no-exist/non-partitioned-topic";
try {
- admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
- fail("should falied for namespaces not exist");
+ admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+ fail("should failed for namespaces not exist");
} catch (Exception e) {
assertTrue(e instanceof NotFoundException);
assertTrue(e.getMessage().equals("Namespace not found"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index a334fceed76..0eb87491a52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1416,7 +1416,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster));
admin.topics().createNonPartitionedTopic(topic);
-
admin.topics().delete(topic);
try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 330d945f651..5a9f4642791 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -51,6 +51,7 @@ import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -368,10 +369,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
String testLocalTopicName = "topic-not-found";
// 1) Create the nonPartitionTopic topic
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, null);
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, null);
// 2) Create a subscription
- AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
new ResetCursorData(MessageId.earliest), false);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
@@ -394,7 +395,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testNonPartitionedTopics() {
- final String nonPartitionTopic = "non-partitioned-topic";
+ final String nonPartitionTopic = BrokerTestUtil.newUniqueName("non-partitioned-topic");
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.createSubscription(response, testTenant, testNamespace, nonPartitionTopic, "test", true,
new ResetCursorData(MessageId.latest), false);
@@ -403,52 +404,69 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
+ ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.getSubscriptions(response, testTenant, testNamespace, nonPartitionTopic + "-partition-0",
true);
- ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero partitions"));
response = mock(AsyncResponse.class);
- final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true, null);
- persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, false);
- ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
- verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
- Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
- response = mock(AsyncResponse.class);
- responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
- persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, true);
- verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
- Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
+ final String nonPartitionTopic2 = BrokerTestUtil.newUniqueName("secondary-non-partitioned-topic");
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, nonPartitionTopic2, true, null);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace)
+ .contains("persistent://" + testTenant + "/" + testNamespace + "/" + nonPartitionTopic2));
+ });
+
+ AsyncResponse metaResponse = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, nonPartitionTopic, true, false);
+ verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+ Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
+
+ metaResponse = mock(AsyncResponse.class);
+ metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, nonPartitionTopic, true, true);
+ verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+ Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
}
@Test
public void testCreateNonPartitionedTopic() {
- final String topicName = "standard-topic-partition-a";
+ final String topic = "standard-topic-partition-a";
+ TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic);
AsyncResponse response = mock(AsyncResponse.class);
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
- persistentTopics.getPartitionedMetadata(response,
- testTenant, testNamespace, topicName, true, false);
- ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
- verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
- Assert.assertEquals(responseCaptor.getValue().partitions, 0);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic, true, null);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace).contains(topicName.toString()));
+ });
+ AsyncResponse metaResponse = mock(AsyncResponse.class);
+ ArgumentCaptor<PartitionedTopicMetadata> metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, topic, true, false);
+ verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+ Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
+
+ metaResponse = mock(AsyncResponse.class);
+ metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ persistentTopics.getPartitionedMetadata(metaResponse,
+ testTenant, testNamespace, topic, true, true);
+ verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+ Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
response = mock(AsyncResponse.class);
- responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
- persistentTopics.getPartitionedMetadata(response,
- testTenant, testNamespace, topicName, true, true);
- verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
- Assert.assertEquals(responseCaptor.getValue().partitions, 0);
- final String topicName2 = "standard-topic-partition-b";
+ metaResponse = mock(AsyncResponse.class);
+ metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+ final String topic2 = "standard-topic-partition-b";
+ TopicName topicName2 = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic2);
Map<String, String> topicMetadata = Maps.newHashMap();
topicMetadata.put("key1", "value1");
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName2, true, topicMetadata);
- response = mock(AsyncResponse.class);
- responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
- persistentTopics.getPartitionedMetadata(response,
- testTenant, testNamespace, topicName2, true, false);
- verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
- Assert.assertNull(responseCaptor.getValue().properties);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic2, true, topicMetadata);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace).contains(topicName2.toString()));
+ });
+ persistentTopics.getPartitionedMetadata(metaResponse,
+ testTenant, testNamespace, topic2, true, false);
+ verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+ Assert.assertNull(metaResponseCaptor.getValue().properties);
}
@Test
@@ -506,15 +524,19 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found"));
}
- @Test(expectedExceptions = RestException.class)
+ @Test
public void testCreateNonPartitionedTopicWithInvalidName() {
final String topicName = "standard-topic-partition-10";
doAnswer(invocation -> {
- TopicName partitionedTopicname = invocation.getArgument(0, TopicName.class);
- assert(partitionedTopicname.getLocalName().equals("standard-topic"));
+ TopicName partitionedTopicName = invocation.getArgument(0, TopicName.class);
+ assert(partitionedTopicName.getLocalName().equals("standard-topic"));
return new PartitionedTopicMetadata(10);
}).when(persistentTopics).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean());
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+ final AsyncResponse response = mock(AsyncResponse.class);
+ ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
}
@Test
@@ -580,7 +602,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// 2) create non partitioned topic and unload
response = mock(AsyncResponse.class);
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -675,8 +697,16 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
topics = (List<String>) responseCaptor.getValue();
Assert.assertEquals(topics.size(), 2);
- nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "test-topic-2", false, null);
- nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "__change_events", false, null);
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ nonPersistentTopic.createNonPartitionedTopic(response, testTenant, testNamespace, "test-topic-2", false, null);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+ response = mock(AsyncResponse.class);
+ responseCaptor = ArgumentCaptor.forClass(Response.class);
+ nonPersistentTopic.createNonPartitionedTopic(response, testTenant, testNamespace, "__change_events", false, null);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
response = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
@@ -696,11 +726,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testGrantNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
- AsyncResponse response = mock(AsyncResponse.class);
+ response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -720,14 +751,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3, true);
final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
- try {
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, partitionName, false, null);
- Assert.fail();
- } catch (RestException e) {
- log.error("Failed to create {}: {}", partitionName, e.getMessage());
- Assert.assertEquals(e.getResponse().getStatus(), 409);
- Assert.assertEquals(e.getMessage(), "This topic already exists");
- }
+ ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, partitionName, false, null);
+ verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+ Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
}
@Test
@@ -773,11 +800,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testRevokeNonPartitionedTopic() {
final String topicName = "non-partitioned-topic";
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+ AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
String role = "role";
Set<AuthAction> expectActions = new HashSet<>();
expectActions.add(AuthAction.produce);
- AsyncResponse response = mock(AsyncResponse.class);
+ response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -852,7 +880,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
// create non partitioned topic and compaction on it
response = mock(AsyncResponse.class);
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopicName, true, null);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, nonPartitionTopicName, true, null);
persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -1030,9 +1058,9 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testOffloadWithNullMessageId() {
final String topicName = "topic-123";
- persistentTopics.createNonPartitionedTopic(
- testTenant, testNamespace, topicName, true, null);
AsyncResponse response = mock(AsyncResponse.class);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
+ response = mock(AsyncResponse.class);
persistentTopics.triggerOffload(
response, testTenant, testNamespace, topicName, true, null);
ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
@@ -1303,9 +1331,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
@Test
public void testDeleteTopic() throws Exception {
final String topicName = "topic-1";
+ AsyncResponse response = mock(AsyncResponse.class);
BrokerService brokerService = spy(pulsar.getBrokerService());
doReturn(brokerService).when(pulsar).getBrokerService();
- persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, false, null);
+ persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, false, null);
CompletableFuture<Void> deleteTopicFuture = new CompletableFuture<>();
deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException());
doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 4d6a0a7b15e..bbc625b0763 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1289,7 +1289,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
public void testGetSetSubscriptionDispatchRate() throws Exception {
final String topic = testTopic + UUID.randomUUID();
admin.topics().createNonPartitionedTopic(topic);
-
DispatchRate dispatchRate = DispatchRate.builder()
.dispatchThrottlingRateInMsg(1000)
.dispatchThrottlingRateInByte(1024 * 1024)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 936e3e0d263..6ffdf6d2188 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -142,6 +142,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ admin.topics().delete(topicString, true);
}
/**