You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/16 12:01:01 UTC

[pulsar] branch master updated: [cleanup][broker] Remove duplicate code to improve delete subscription (#15347)

This is an automated email from the ASF dual-hosted git repository.

tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fdebf50cf28 [cleanup][broker] Remove duplicate code to improve delete subscription (#15347)
fdebf50cf28 is described below

commit fdebf50cf28888b562b210c5fe50890aba2b3345
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Fri Dec 16 20:00:54 2022 +0800

    [cleanup][broker] Remove duplicate code to improve delete subscription (#15347)
    
    Signed-off-by: Zixuan Liu <no...@gmail.com>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 119 +++++++--------------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  33 ++++--
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  32 ++++--
 3 files changed, 83 insertions(+), 101 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a932a391b1..238dd2cc942 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1527,16 +1527,9 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    protected void internalDeleteSubscription(AsyncResponse asyncResponse,
-                                              String subName, boolean authoritative, boolean force) {
-        if (force) {
-            internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
-        } else {
-            internalDeleteSubscription(asyncResponse, subName, authoritative);
-        }
-    }
-
-    protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+    protected CompletableFuture<Void> internalDeleteSubscriptionAsync(String subName,
+                                                                      boolean authoritative,
+                                                                      boolean force) {
         CompletableFuture<Void> future;
         if (topicName.isGlobal()) {
             future = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -1544,103 +1537,63 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
 
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+        return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenCompose(__ -> {
             if (topicName.isPartitioned()) {
-                internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
+                return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force);
             } else {
-                getPartitionedTopicMetadataAsync(topicName,
-                        authoritative, false).thenAcceptAsync(partitionMetadata -> {
+                return getPartitionedTopicMetadataAsync(topicName,
+                        authoritative, false).thenCompose(partitionMetadata -> {
                     if (partitionMetadata.partitions > 0) {
                         final List<CompletableFuture<Void>> futures = new ArrayList<>();
-
+                        PulsarAdmin adminClient;
+                        try {
+                            adminClient = pulsar().getAdminClient();
+                        } catch (PulsarServerException e) {
+                            return CompletableFuture.failedFuture(e);
+                        }
                         for (int i = 0; i < partitionMetadata.partitions; i++) {
                             TopicName topicNamePartition = topicName.getPartition(i);
-                            try {
-                                futures.add(pulsar().getAdminClient().topics()
-                                        .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
-                            } catch (Exception e) {
-                                log.error("[{}] Failed to delete subscription {} {}",
-                                        clientAppId(), topicNamePartition, subName,
-                                        e);
-                                asyncResponse.resume(new RestException(e));
-                                return;
-                            }
+                            futures.add(adminClient.topics()
+                                    .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
                         }
 
-                        FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                        return FutureUtil.waitForAll(futures).handle((result, exception) -> {
                             if (exception != null) {
                                 Throwable t = exception.getCause();
                                 if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
-                                    return null;
+                                    throw new RestException(Status.NOT_FOUND,
+                                            "Subscription not found");
                                 } else if (t instanceof PreconditionFailedException) {
-                                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                                            "Subscription has active connected consumers"));
-                                    return null;
+                                    throw new RestException(Status.PRECONDITION_FAILED,
+                                            "Subscription has active connected consumers");
                                 } else {
-                                    log.error("[{}] Failed to delete subscription {} {}",
-                                            clientAppId(), topicName, subName, t);
-                                    asyncResponse.resume(new RestException(t));
-                                    return null;
+                                    throw new RestException(t);
                                 }
                             }
-
-                            asyncResponse.resume(Response.noContent().build());
                             return null;
                         });
-                    } else {
-                        internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
                     }
-                }, pulsar().getExecutor()).exceptionally(ex -> {
-                    log.error("[{}] Failed to delete subscription {} from topic {}",
-                            clientAppId(), subName, topicName, ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
+                            force);
                 });
             }
-        }).exceptionally(ex -> {
-            // If the exception is not redirect exception we need to log it.
-            if (!isRedirectException(ex)) {
-                log.error("[{}] Failed to delete subscription {} from topic {}",
-                        clientAppId(), subName, topicName, ex);
-            }
-            resumeAsyncResponseExceptionally(asyncResponse, ex);
-            return null;
         });
     }
 
-    private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
-                                                                  String subName, boolean authoritative) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-            .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
-            .thenCompose(__ -> getTopicReferenceAsync(topicName))
-            .thenCompose(topic -> {
-                Subscription sub = topic.getSubscription(subName);
-                if (sub == null) {
-                    throw new RestException(Status.NOT_FOUND,
-                            getSubNotFoundErrorMessage(topicName.toString(), subName));
-                }
-                return sub.delete();
-            }).thenRun(() -> {
-                log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
-                asyncResponse.resume(Response.noContent().build());
-            }).exceptionally(ex -> {
-                Throwable cause = ex.getCause();
-                if (cause instanceof SubscriptionBusyException) {
-                    log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
-                            topicName, cause);
-                    asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
-                            "Subscription has active connected consumers"));
-                } else {
-                    // If the exception is not redirect exception we need to log it.
-                    if (!isRedirectException(ex)) {
-                        log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause);
+    private CompletableFuture<Void> internalDeleteSubscriptionForNonPartitionedTopicAsync(String subName,
+                                                                                          boolean authoritative,
+                                                                                          boolean force) {
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose((__) -> validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE))
+                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+                .thenCompose((topic) -> {
+                    Subscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        throw new RestException(Status.NOT_FOUND,
+                                getSubNotFoundErrorMessage(topicName.toString(), subName));
                     }
-                    asyncResponse.resume(new RestException(cause));
-                }
-                return null;
-            });
+                    return force ? sub.deleteForcefully() : sub.delete();
+                });
     }
 
     private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 943b191209f..070260c7bfd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -548,15 +548,30 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
             @QueryParam("force") @DefaultValue("false") boolean force,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        try {
-            validateTopicName(property, cluster, namespace, encodedTopic);
-            internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+                                   @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+        validateTopicName(property, cluster, namespace, encodedTopic);
+        String subName = decode(encodedSubName);
+        internalDeleteSubscriptionAsync(subName, authoritative, force)
+                .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(cause)) {
+                        log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
+                                topicName, cause);
+                    }
+
+                    if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
+                        resumeAsyncResponseExceptionally(asyncResponse,
+                                new RestException(Response.Status.PRECONDITION_FAILED,
+                                        "Subscription has active connected consumers"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    }
+
+                    return null;
+                });
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index aef1babe89a..0fb2ed8cec7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1387,15 +1387,29 @@ public class PersistentTopics extends PersistentTopicsBase {
             @QueryParam("force") @DefaultValue("false") boolean force,
             @ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
-        try {
-            validateTopicName(tenant, namespace, encodedTopic);
-            validateTopicOwnership(topicName, authoritative);
-            internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
-        } catch (WebApplicationException wae) {
-            asyncResponse.resume(wae);
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e));
-        }
+        validateTopicName(tenant, namespace, encodedTopic);
+        String subName = decode(encodedSubName);
+        internalDeleteSubscriptionAsync(subName, authoritative, force)
+                .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    Throwable cause = FutureUtil.unwrapCompletionException(ex);
+
+                    // If the exception is not redirect exception we need to log it.
+                    if (!isRedirectException(cause)) {
+                        log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
+                                topicName, cause);
+                    }
+
+                    if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
+                        resumeAsyncResponseExceptionally(asyncResponse,
+                                new RestException(Response.Status.PRECONDITION_FAILED,
+                                        "Subscription has active connected consumers"));
+                    } else {
+                        resumeAsyncResponseExceptionally(asyncResponse, cause);
+                    }
+
+                    return null;
+                });
     }
 
     @POST