You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/13 06:34:16 UTC

[pulsar] branch master updated: [improve][broker][PIP-149]Make TruncateTopic pure async (#16464)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cbcaf40e146 [improve][broker][PIP-149]Make TruncateTopic pure async (#16464)
cbcaf40e146 is described below

commit cbcaf40e146aa9dcf35959fa90d80296b8bf1a99
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jul 13 14:34:11 2022 +0800

    [improve][broker][PIP-149]Make TruncateTopic pure async (#16464)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 64 +++++-----------------
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 16 +++++-
 2 files changed, 28 insertions(+), 52 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d39b13412d2..529294ef437 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4923,73 +4923,37 @@ public class PersistentTopicsBase extends AdminResource {
         resumeAsyncResponseExceptionally(asyncResponse, cause);
     }
 
-    protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+    protected CompletableFuture<Void> internalTruncateNonPartitionedTopicAsync(boolean authoritative) {
+        return validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+            .thenCompose(__ -> getTopicReferenceAsync(topicName))
+            .thenCompose(Topic::truncate);
     }
 
-    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+    protected CompletableFuture<Void> internalTruncateTopicAsync(boolean authoritative) {
 
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
-            internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+            return internalTruncateNonPartitionedTopicAsync(authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+            return getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenCompose(meta -> {
                 if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    final List<CompletableFuture<Void>> futures = new ArrayList<>(meta.partitions);
                     for (int i = 0; i < meta.partitions; i++) {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
-                            futures.add(pulsar().getAdminClient().topics()
+                            futures.add(
+                                pulsar().getAdminClient().topics()
                                 .truncateAsync(topicNamePartition.toString()));
                         } catch (Exception e) {
                             log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
+                            return FutureUtil.failedFuture(new RestException(e));
                         }
                     }
-                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                        if (exception != null) {
-                            Throwable th = exception.getCause();
-                            if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
-                                asyncResponse.resume(new RestException(exception));
-                            }
-                        } else {
-                            asyncResponse.resume(Response.noContent().build());
-                        }
-                        return null;
-                    });
+                    return FutureUtil.waitForAll(futures);
                 } else {
-                    internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+                    return internalTruncateNonPartitionedTopicAsync(authoritative);
                 }
-            }).exceptionally(ex -> {
-                // If the exception is not redirect exception we need to log it.
-                if (!isRedirectException(ex)) {
-                    log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, ex);
-                }
-                resumeAsyncResponseExceptionally(asyncResponse, ex);
-                return null;
             });
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fbd9f008fd8..edf49d410ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -48,6 +48,7 @@ import javax.ws.rs.core.Response;
 import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
@@ -3937,8 +3938,19 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative){
         validateTopicName(tenant, namespace, encodedTopic);
-        internalTruncateTopic(asyncResponse, authoritative);
-
+        internalTruncateTopicAsync(authoritative)
+            .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+            .exceptionally(ex -> {
+                Throwable t = FutureUtil.unwrapCompletionException(ex);
+                if (!isRedirectException(t)) {
+                    log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, t);
+                }
+                if (t instanceof PulsarAdminException.NotFoundException) {
+                    t = new RestException(Response.Status.NOT_FOUND, t.getMessage());
+                }
+                resumeAsyncResponseExceptionally(asyncResponse, t);
+                return null;
+            });
     }
 
     @POST