You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/07/13 06:34:16 UTC
[pulsar] branch master updated: [improve][broker][PIP-149]Make TruncateTopic pure async (#16464)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cbcaf40e146 [improve][broker][PIP-149]Make TruncateTopic pure async (#16464)
cbcaf40e146 is described below
commit cbcaf40e146aa9dcf35959fa90d80296b8bf1a99
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Wed Jul 13 14:34:11 2022 +0800
[improve][broker][PIP-149]Make TruncateTopic pure async (#16464)
---
.../broker/admin/impl/PersistentTopicsBase.java | 64 +++++-----------------
.../pulsar/broker/admin/v2/PersistentTopics.java | 16 +++++-
2 files changed, 28 insertions(+), 52 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index d39b13412d2..529294ef437 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4923,73 +4923,37 @@ public class PersistentTopicsBase extends AdminResource {
resumeAsyncResponseExceptionally(asyncResponse, cause);
}
- protected void internalTruncateNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
- Topic topic;
- try {
- validateAdminAccessForTenant(topicName.getTenant());
- validateTopicOwnership(topicName, authoritative);
- topic = getTopicReference(topicName);
- } catch (Exception e) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, e);
- resumeAsyncResponseExceptionally(asyncResponse, e);
- return;
- }
- CompletableFuture<Void> future = topic.truncate();
- future.thenAccept(a -> {
- asyncResponse.resume(new RestException(Response.Status.NO_CONTENT.getStatusCode(),
- Response.Status.NO_CONTENT.getReasonPhrase()));
- }).exceptionally(e -> {
- asyncResponse.resume(e);
- return null;
- });
+ protected CompletableFuture<Void> internalTruncateNonPartitionedTopicAsync(boolean authoritative) {
+ return validateAdminAccessForTenantAsync(topicName.getTenant())
+ .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ .thenCompose(Topic::truncate);
}
- protected void internalTruncateTopic(AsyncResponse asyncResponse, boolean authoritative) {
+ protected CompletableFuture<Void> internalTruncateTopicAsync(boolean authoritative) {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
- internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+ return internalTruncateNonPartitionedTopicAsync(authoritative);
} else {
- getPartitionedTopicMetadataAsync(topicName, authoritative, false).whenComplete((meta, t) -> {
+ return getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenCompose(meta -> {
if (meta.partitions > 0) {
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ final List<CompletableFuture<Void>> futures = new ArrayList<>(meta.partitions);
for (int i = 0; i < meta.partitions; i++) {
TopicName topicNamePartition = topicName.getPartition(i);
try {
- futures.add(pulsar().getAdminClient().topics()
+ futures.add(
+ pulsar().getAdminClient().topics()
.truncateAsync(topicNamePartition.toString()));
} catch (Exception e) {
log.error("[{}] Failed to truncate topic {}", clientAppId(), topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
+ return FutureUtil.failedFuture(new RestException(e));
}
}
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- Throwable th = exception.getCause();
- if (th instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, th.getMessage()));
- } else if (th instanceof WebApplicationException) {
- asyncResponse.resume(th);
- } else {
- log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, exception);
- asyncResponse.resume(new RestException(exception));
- }
- } else {
- asyncResponse.resume(Response.noContent().build());
- }
- return null;
- });
+ return FutureUtil.waitForAll(futures);
} else {
- internalTruncateNonPartitionedTopic(asyncResponse, authoritative);
+ return internalTruncateNonPartitionedTopicAsync(authoritative);
}
- }).exceptionally(ex -> {
- // If the exception is not redirect exception we need to log it.
- if (!isRedirectException(ex)) {
- log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, ex);
- }
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
});
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fbd9f008fd8..edf49d410ad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -48,6 +48,7 @@ import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.ResetCursorData;
@@ -3937,8 +3938,19 @@ public class PersistentTopics extends PersistentTopicsBase {
@ApiParam(value = "Whether leader broker redirected this call to this broker. For internal use.")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative){
validateTopicName(tenant, namespace, encodedTopic);
- internalTruncateTopic(asyncResponse, authoritative);
-
+ internalTruncateTopicAsync(authoritative)
+ .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+ .exceptionally(ex -> {
+ Throwable t = FutureUtil.unwrapCompletionException(ex);
+ if (!isRedirectException(t)) {
+ log.error("[{}] Failed to truncate topic {}", clientAppId(), topicName, t);
+ }
+ if (t instanceof PulsarAdminException.NotFoundException) {
+ t = new RestException(Response.Status.NOT_FOUND, t.getMessage());
+ }
+ resumeAsyncResponseExceptionally(asyncResponse, t);
+ return null;
+ });
}
@POST