You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/12 00:55:59 UTC

[GitHub] [pulsar] Technoboy- commented on a diff in pull request #16464: [improve][broker][PIP-149]Make TruncateTopic pure async

Technoboy- commented on code in PR #16464:
URL: https://github.com/apache/pulsar/pull/16464#discussion_r918456579


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -4920,73 +4920,45 @@ protected void handleTopicPolicyException(String methodName, Throwable thr, Asyn
         resumeAsyncResponseExceptionally(asyncResponse, cause);
     }
 
-    protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
-        Topic topic;
-        try {
-            validateAdminAccessForTenant(topicName.getTenant());
-            validateTopicOwnership(topicName, authoritative);
-            topic = getTopicReference(topicName);
-        } catch (Exception e) {
-            log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
-            resumeAsyncResponseExceptionally(asyncResponse, e);
-            return;
-        }
-        CompletableFuture<Void> future = topic.truncate();
-        future.thenAccept(a -> {
-            asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
-                    Response.Status.NO_CONTENT.getReasonPhrase()));
-        }).exceptionally(e -> {
-            asyncResponse.resume(e);
-            return null;
-        });
+    protected CompletableFuture<Void> internalTruncateNonPartitionedTopicAsync(boolean authoritative) {
+        return validateAdminAccessForTenantAsync(topicName.getTenant())
+            .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+            .thenCompose(__ -> getTopicReferenceAsync(topicName))
+            .thenCompose(Topic::truncate);
     }
 
-    protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+    protected CompletableFuture<Void> internalTruncateTopicAsync(boolean authoritative) {
 
         // If the topic name is a partition name, no need to get partition topic metadata again
         if (topicName.isPartitioned()) {
-            internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+            return internalTruncateNonPartitionedTopicAsync(authoritative);
         } else {
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+            return getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenCompose(meta -> {
                 if (meta.partitions > 0) {
-                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+                    final List<CompletableFuture<Void>> futures = new ArrayList<>(meta.partitions);
                     for (int i = 0; i < meta.partitions; i++) {
                         TopicName topicNamePartition = topicName.getPartition(i);
                         try {
-                            futures.add(pulsar().getAdminClient().topics()
+                            futures.add(
+                                pulsar().getAdminClient().topics()
                                 .truncateAsync(topicNamePartition.toString()));
                         } catch (Exception e) {
                             log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
-                            asyncResponse.resume(new RestException(e));
-                            return;
+                            return FutureUtil.failedFuture(new RestException(e));
                         }
                     }
-                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                    return FutureUtil.waitForAll(futures).handle((result, exception) -> {
                         if (exception != null) {
                             Throwable th = exception.getCause();
                             if (th instanceof NotFoundException) {
-                                asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
-                            } else if (th instanceof WebApplicationException) {
-                                asyncResponse.resume(th);
-                            } else {
-                                log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
-                                asyncResponse.resume(new RestException(exception));
+                                throw new RestException(Status.NOT_FOUND, th.getMessage());

Review Comment:
   Move this line to the rest layer.  Otherwise, if there is an exception not an instanceof NotFoundException, there will return null.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org