You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/27 11:50:00 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]make terminate method async (#16227)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ebcc47ee7ce [improve][broker][PIP-149]make terminate method async (#16227)
ebcc47ee7ce is described below
commit ebcc47ee7ceb43f680640ad72e51a06d9856458d
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 19:49:53 2022 +0800
[improve][broker][PIP-149]make terminate method async (#16227)
---
.../broker/admin/impl/PersistentTopicsBase.java | 37 +++++++++++++---------
.../pulsar/broker/admin/v1/PersistentTopics.java | 19 ++++++++---
.../pulsar/broker/admin/v2/PersistentTopics.java | 14 ++++++--
.../pulsar/broker/admin/PersistentTopicsTest.java | 8 +++--
4 files changed, 53 insertions(+), 25 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c95928d9029..7f8b5f4ef82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3623,23 +3623,30 @@ public class PersistentTopicsBase extends AdminResource {
}
- protected MessageId internalTerminate(boolean authoritative) {
+ protected CompletableFuture<MessageId> internalTerminateAsync(boolean authoritative) {
+ CompletableFuture<Void> ret;
if (topicName.isGlobal()) {
- validateGlobalNamespaceOwnership(namespaceName);
- }
- PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
- if (partitionMetadata.partitions > 0) {
- throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
- }
- validateTopicOwnership(topicName, authoritative);
- validateTopicOperation(topicName, TopicOperation.TERMINATE);
- Topic topic = getTopicReference(topicName);
- try {
- return ((PersistentTopic) topic).terminate().get();
- } catch (Exception exception) {
- log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, exception);
- throw new RestException(exception);
+ ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+ } else {
+ ret = CompletableFuture.completedFuture(null);
}
+ return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.TERMINATE))
+ .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+ .thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions > 0) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Termination of a partitioned topic is not allowed");
+ }
+ })
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(topic -> {
+ if (!(topic instanceof PersistentTopic)) {
+ throw new RestException(Status.METHOD_NOT_ALLOWED,
+ "Termination of a non-persistent topic is not allowed");
+ }
+ return ((PersistentTopic) topic).terminate();
+ });
}
protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index a3689731436..bdc43f4b133 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,7 +44,6 @@ import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.policies.data.AuthAction;
@@ -832,11 +831,23 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
@ApiResponse(code = 406, message = "Need to provide a persistent topic name"),
@ApiResponse(code = 404, message = "Topic does not exist") })
- public MessageId terminate(@PathParam("property") String property, @PathParam("cluster") String cluster,
- @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+ public void terminate(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("property") String property,
+ @PathParam("cluster") String cluster,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(property, cluster, namespace, encodedTopic);
- return internalTerminate(authoritative);
+ internalTerminateAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index bfcbd3169d3..b0a76e4ab7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -48,7 +48,6 @@ import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
@@ -2894,7 +2893,8 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiResponse(code = 412, message = "Topic name is not valid"),
@ApiResponse(code = 500, message = "Internal server error"),
@ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
- public MessageId terminate(
+ public void terminate(
+ @Suspended final AsyncResponse asyncResponse,
@ApiParam(value = "Specify the tenant", required = true)
@PathParam("tenant") String tenant,
@ApiParam(value = "Specify the namespace", required = true)
@@ -2904,7 +2904,15 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validatePersistentTopicName(tenant, namespace, encodedTopic);
- return internalTerminate(authoritative);
+ internalTerminateAsync(authoritative)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex);
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 2fd4389146c..f898b9d7246 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -401,13 +401,15 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
// 3) Assert terminate persistent topic
- MessageId messageId = persistentTopics.terminate(testTenant, testNamespace, testLocalTopicName, true);
- Assert.assertEquals(messageId, new MessageIdImpl(3, -1, -1));
+ response = mock(AsyncResponse.class);
+ persistentTopics.terminate(response, testTenant, testNamespace, testLocalTopicName, true);
+ MessageId messageId = new MessageIdImpl(3, -1, -1);
+ verify(response, timeout(5000).times(1)).resume(messageId);
// 4) Assert terminate non-persistent topic
String nonPersistentTopicName = "non-persistent-topic";
try {
- nonPersistentTopic.terminate(testTenant, testNamespace, nonPersistentTopicName, true);
+ nonPersistentTopic.terminate(response, testTenant, testNamespace, nonPersistentTopicName, true);
Assert.fail("Should fail validation on non-persistent topic");
} catch (RestException e) {
Assert.assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(), e.getResponse().getStatus());