You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/06/27 11:50:00 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]make terminate method async (#16227)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ebcc47ee7ce [improve][broker][PIP-149]make terminate method async (#16227)
ebcc47ee7ce is described below

commit ebcc47ee7ceb43f680640ad72e51a06d9856458d
Author: Qiang Huang <HQ...@users.noreply.github.com>
AuthorDate: Mon Jun 27 19:49:53 2022 +0800

    [improve][broker][PIP-149]make terminate method async (#16227)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 37 +++++++++++++---------
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 19 ++++++++---
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 14 ++++++--
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  8 +++--
 4 files changed, 53 insertions(+), 25 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c95928d9029..7f8b5f4ef82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3623,23 +3623,30 @@ public class PersistentTopicsBase extends AdminResource {
 
     }
 
-    protected MessageId internalTerminate(boolean authoritative) {
+    protected CompletableFuture<MessageId> internalTerminateAsync(boolean authoritative) {
+        CompletableFuture<Void> ret;
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
-        }
-        validateTopicOwnership(topicName, authoritative);
-        validateTopicOperation(topicName, TopicOperation.TERMINATE);
-        Topic topic = getTopicReference(topicName);
-        try {
-            return ((PersistentTopic) topic).terminate().get();
-        } catch (Exception exception) {
-            log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, exception);
-            throw new RestException(exception);
+            ret = validateGlobalNamespaceOwnershipAsync(namespaceName);
+        } else {
+            ret = CompletableFuture.completedFuture(null);
         }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.TERMINATE))
+                .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
+                .thenAccept(partitionMetadata -> {
+                    if (partitionMetadata.partitions > 0) {
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Termination of a partitioned topic is not allowed");
+                    }
+                })
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose(topic -> {
+                    if (!(topic instanceof PersistentTopic)) {
+                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                                "Termination of a non-persistent topic is not allowed");
+                    }
+                    return ((PersistentTopic) topic).terminate();
+                });
     }
 
     protected void internalTerminatePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index a3689731436..bdc43f4b133 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -44,7 +44,6 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -832,11 +831,23 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 405, message = "Operation not allowed on non-persistent topic"),
             @ApiResponse(code = 406, message = "Need to provide a persistent topic name"),
             @ApiResponse(code = 404, message = "Topic does not exist") })
-    public MessageId terminate(@PathParam("property") String property, @PathParam("cluster") String cluster,
-            @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
+    public void terminate(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validatePersistentTopicName(property, cluster, namespace, encodedTopic);
-        return internalTerminate(authoritative);
+        internalTerminateAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index bfcbd3169d3..b0a76e4ab7b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -48,7 +48,6 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
-import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
@@ -2894,7 +2893,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Topic name is not valid"),
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")})
-    public MessageId terminate(
+    public void terminate(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -2904,7 +2904,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validatePersistentTopicName(tenant, namespace, encodedTopic);
-        return internalTerminate(authoritative);
+        internalTerminateAsync(authoritative)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to terminated topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 2fd4389146c..f898b9d7246 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -401,13 +401,15 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         // 3) Assert terminate persistent topic
-        MessageId messageId = persistentTopics.terminate(testTenant, testNamespace, testLocalTopicName, true);
-        Assert.assertEquals(messageId, new MessageIdImpl(3, -1, -1));
+        response = mock(AsyncResponse.class);
+        persistentTopics.terminate(response, testTenant, testNamespace, testLocalTopicName, true);
+        MessageId messageId = new MessageIdImpl(3, -1, -1);
+        verify(response, timeout(5000).times(1)).resume(messageId);
 
         // 4) Assert terminate non-persistent topic
         String nonPersistentTopicName = "non-persistent-topic";
         try {
-            nonPersistentTopic.terminate(testTenant, testNamespace, nonPersistentTopicName, true);
+            nonPersistentTopic.terminate(response, testTenant, testNamespace, nonPersistentTopicName, true);
             Assert.fail("Should fail validation on non-persistent topic");
         } catch (RestException e) {
             Assert.assertEquals(Response.Status.NOT_ACCEPTABLE.getStatusCode(), e.getResponse().getStatus());