You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ti...@apache.org on 2022/12/16 12:01:01 UTC
[pulsar] branch master updated: [cleanup][broker] Remove duplicate code to improve delete subscription (#15347)
This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fdebf50cf28 [cleanup][broker] Remove duplicate code to improve delete subscription (#15347)
fdebf50cf28 is described below
commit fdebf50cf28888b562b210c5fe50890aba2b3345
Author: Zixuan Liu <no...@gmail.com>
AuthorDate: Fri Dec 16 20:00:54 2022 +0800
[cleanup][broker] Remove duplicate code to improve delete subscription (#15347)
Signed-off-by: Zixuan Liu <no...@gmail.com>
---
.../broker/admin/impl/PersistentTopicsBase.java | 119 +++++++--------------
.../pulsar/broker/admin/v1/PersistentTopics.java | 33 ++++--
.../pulsar/broker/admin/v2/PersistentTopics.java | 32 ++++--
3 files changed, 83 insertions(+), 101 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4a932a391b1..238dd2cc942 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1527,16 +1527,9 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- protected void internalDeleteSubscription(AsyncResponse asyncResponse,
- String subName, boolean authoritative, boolean force) {
- if (force) {
- internalDeleteSubscriptionForcefully(asyncResponse, subName, authoritative);
- } else {
- internalDeleteSubscription(asyncResponse, subName, authoritative);
- }
- }
-
- protected void internalDeleteSubscription(AsyncResponse asyncResponse, String subName, boolean authoritative) {
+ protected CompletableFuture<Void> internalDeleteSubscriptionAsync(String subName,
+ boolean authoritative,
+ boolean force) {
CompletableFuture<Void> future;
if (topicName.isGlobal()) {
future = validateGlobalNamespaceOwnershipAsync(namespaceName);
@@ -1544,103 +1537,63 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenAccept(__ -> {
+ return future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)).thenCompose(__ -> {
if (topicName.isPartitioned()) {
- internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
+ return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative, force);
} else {
- getPartitionedTopicMetadataAsync(topicName,
- authoritative, false).thenAcceptAsync(partitionMetadata -> {
+ return getPartitionedTopicMetadataAsync(topicName,
+ authoritative, false).thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
-
+ PulsarAdmin adminClient;
+ try {
+ adminClient = pulsar().getAdminClient();
+ } catch (PulsarServerException e) {
+ return CompletableFuture.failedFuture(e);
+ }
for (int i = 0; i < partitionMetadata.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
- try {
- futures.add(pulsar().getAdminClient().topics()
- .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
- } catch (Exception e) {
- log.error("[{}] Failed to delete subscription {} {}",
- clientAppId(), topicNamePartition, subName,
- e);
- asyncResponse.resume(new RestException(e));
- return;
- }
+ futures.add(adminClient.topics()
+ .deleteSubscriptionAsync(topicNamePartition.toString(), subName, false));
}
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ return FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- getSubNotFoundErrorMessage(topicName.toString(), subName)));
- return null;
+ throw new RestException(Status.NOT_FOUND,
+ "Subscription not found");
} else if (t instanceof PreconditionFailedException) {
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Subscription has active connected consumers"));
- return null;
+ throw new RestException(Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers");
} else {
- log.error("[{}] Failed to delete subscription {} {}",
- clientAppId(), topicName, subName, t);
- asyncResponse.resume(new RestException(t));
- return null;
+ throw new RestException(t);
}
}
-
- asyncResponse.resume(Response.noContent().build());
return null;
});
- } else {
- internalDeleteSubscriptionForNonPartitionedTopic(asyncResponse, subName, authoritative);
}
- }, pulsar().getExecutor()).exceptionally(ex -> {
- log.error("[{}] Failed to delete subscription {} from topic {}",
- clientAppId(), subName, topicName, ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ return internalDeleteSubscriptionForNonPartitionedTopicAsync(subName, authoritative,
+ force);
});
}
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to delete subscription {} from topic {}",
- clientAppId(), subName, topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
});
}
- private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyncResponse,
- String subName, boolean authoritative) {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenRun(() -> validateTopicOperation(topicName, TopicOperation.UNSUBSCRIBE, subName))
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
- .thenCompose(topic -> {
- Subscription sub = topic.getSubscription(subName);
- if (sub == null) {
- throw new RestException(Status.NOT_FOUND,
- getSubNotFoundErrorMessage(topicName.toString(), subName));
- }
- return sub.delete();
- }).thenRun(() -> {
- log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName);
- asyncResponse.resume(Response.noContent().build());
- }).exceptionally(ex -> {
- Throwable cause = ex.getCause();
- if (cause instanceof SubscriptionBusyException) {
- log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
- topicName, cause);
- asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
- "Subscription has active connected consumers"));
- } else {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, cause);
+ private CompletableFuture<Void> internalDeleteSubscriptionForNonPartitionedTopicAsync(String subName,
+ boolean authoritative,
+ boolean force) {
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose((__) -> validateTopicOperationAsync(topicName, TopicOperation.UNSUBSCRIBE))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose((topic) -> {
+ Subscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName));
}
- asyncResponse.resume(new RestException(cause));
- }
- return null;
- });
+ return force ? sub.deleteForcefully() : sub.delete();
+ });
}
private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncResponse asyncResponse,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 943b191209f..070260c7bfd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -548,15 +548,30 @@ public class PersistentTopics extends PersistentTopicsBase {
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic, @PathParam("subName") String encodedSubName,
@QueryParam("force") @DefaultValue("false") boolean force,
- @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- try {
- validateTopicName(property, cluster, namespace, encodedTopic);
- internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+ validateTopicName(property, cluster, namespace, encodedTopic);
+ String subName = decode(encodedSubName);
+ internalDeleteSubscriptionAsync(subName, authoritative, force)
+ .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(cause)) {
+ log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
+ topicName, cause);
+ }
+
+ if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
+ resumeAsyncResponseExceptionally(asyncResponse,
+ new RestException(Response.Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ }
+
+ return null;
+ });
}
@POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index aef1babe89a..0fb2ed8cec7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1387,15 +1387,29 @@ public class PersistentTopics extends PersistentTopicsBase {
@QueryParam("force") @DefaultValue("false") boolean force,
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
- try {
- validateTopicName(tenant, namespace, encodedTopic);
- validateTopicOwnership(topicName, authoritative);
- internalDeleteSubscription(asyncResponse, decode(encodedSubName), authoritative, force);
- } catch (WebApplicationException wae) {
- asyncResponse.resume(wae);
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e));
- }
+ validateTopicName(tenant, namespace, encodedTopic);
+ String subName = decode(encodedSubName);
+ internalDeleteSubscriptionAsync(subName, authoritative, force)
+ .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+
+ // If the exception is not redirect exception we need to log it.
+ if (!isRedirectException(cause)) {
+ log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName,
+ topicName, cause);
+ }
+
+ if (cause instanceof BrokerServiceException.SubscriptionBusyException) {
+ resumeAsyncResponseExceptionally(asyncResponse,
+ new RestException(Response.Status.PRECONDITION_FAILED,
+ "Subscription has active connected consumers"));
+ } else {
+ resumeAsyncResponseExceptionally(asyncResponse, cause);
+ }
+
+ return null;
+ });
}
@POST