You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/04/21 01:59:46 UTC
[pulsar] branch master updated: Fix duplicate validateTopicOwnershipAsync (#15120)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 151f1d1d3e1 Fix duplicate validateTopicOwnershipAsync (#15120)
151f1d1d3e1 is described below
commit 151f1d1d3e14df9166547d1aed829c774ccce99d
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Thu Apr 21 09:59:37 2022 +0800
Fix duplicate validateTopicOwnershipAsync (#15120)
Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
.../broker/admin/impl/PersistentTopicsBase.java | 259 ++++++++++-----------
1 file changed, 129 insertions(+), 130 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eb8d9bcc513..2a4be06f418 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1045,11 +1045,13 @@ public class PersistentTopicsBase extends AdminResource {
} else {
future = CompletableFuture.completedFuture(null);
}
- future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
- .thenAccept(unused -> {
+ future.thenCompose(__ ->
+ validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)
+ .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenAccept(unused1 -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
- internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
+ internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenAccept(partitionMetadata -> {
@@ -1067,7 +1069,7 @@ public class PersistentTopicsBase extends AdminResource {
topicResources().persistentTopicExists(topicName.getPartition(i)));
}
FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values()))
- .thenApply(__ ->
+ .thenApply(unused2 ->
existsFutures.entrySet().stream().filter(e -> e.getValue().join())
.map(item -> topicName.getPartition(item.getKey()).toString())
.collect(Collectors.toList())
@@ -1086,7 +1088,7 @@ public class PersistentTopicsBase extends AdminResource {
throw new RestException(e);
}
});
- }).thenAccept(__ -> resumeAsyncResponse(asyncResponse,
+ }).thenAccept(unused3 -> resumeAsyncResponse(asyncResponse,
subscriptions, subscriptionFutures));
} else {
for (int i = 0; i < partitionMetadata.partitions; i++) {
@@ -1104,7 +1106,7 @@ public class PersistentTopicsBase extends AdminResource {
asyncResponse.resume(e);
}
} else {
- internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
+ internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
}
}).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
@@ -1124,7 +1126,8 @@ public class PersistentTopicsBase extends AdminResource {
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
- });
+ })
+ );
}
private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions,
@@ -1153,10 +1156,8 @@ public class PersistentTopicsBase extends AdminResource {
});
}
- private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
- validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
- .thenCompose(__ -> getTopicReferenceAsync(topicName))
+ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
+ getTopicReferenceAsync(topicName)
.thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
.exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
@@ -1731,11 +1732,7 @@ public class PersistentTopicsBase extends AdminResource {
private CompletableFuture<Void> internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
String subName,
boolean authoritative) {
- return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ ->
- validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
- .thenCompose(__ ->
- getTopicReferenceAsync(topicName).thenCompose(t -> {
+ return getTopicReferenceAsync(topicName).thenCompose(t -> {
PersistentTopic topic = (PersistentTopic) t;
BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
if (ex != null) {
@@ -1764,8 +1761,7 @@ public class PersistentTopicsBase extends AdminResource {
}
return sub.clearBacklog().whenComplete(biConsumer);
}
- })
- .exceptionally(ex -> {
+ }).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
@@ -1773,7 +1769,7 @@ public class PersistentTopicsBase extends AdminResource {
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
- }));
+ });
}
protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages,
@@ -1936,7 +1932,7 @@ public class PersistentTopicsBase extends AdminResource {
for (int i = 0; i < subNames.size(); i++) {
try {
futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
- subNames.get(i), expireTimeInSeconds, authoritative));
+ subNames.get(i), expireTimeInSeconds));
} catch (Exception e) {
log.error("[{}] Failed to expire messages for all subscription up to {} on {}",
clientAppId(), expireTimeInSeconds, topicName, e);
@@ -3447,61 +3443,68 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
- // If the topic name is a partition name, no need to get partition topic metadata again
- getPartitionedTopicMetadataAsync(topicName, authoritative, false)
- .thenCompose(partitionMetadata -> {
- if (topicName.isPartitioned()) {
- return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, subName,
- expireTimeInSeconds, authoritative)
- .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
- } else {
- if (partitionMetadata.partitions > 0) {
- return CompletableFuture.completedFuture(null).thenAccept(unused -> {
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
- // expire messages for each partition topic
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- TopicName topicNamePartition = topicName.getPartition(i);
- try {
- futures.add(pulsar()
- .getAdminClient()
- .topics()
- .expireMessagesAsync(topicNamePartition.toString(),
- subName, expireTimeInSeconds));
- } catch (Exception e) {
- log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(),
- expireTimeInSeconds, topicNamePartition, e);
- asyncResponse.resume(new RestException(e));
- return;
+ validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
+ .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(unused2 ->
+ // If the topic name is a partition name, no need to get partition topic metadata again
+ getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+ .thenCompose(partitionMetadata -> {
+ if (topicName.isPartitioned()) {
+ return internalExpireMessagesByTimestampForSinglePartitionAsync
+ (partitionMetadata, subName, expireTimeInSeconds)
+ .thenAccept(unused3 ->
+ asyncResponse.resume(Response.noContent().build()));
+ } else {
+ if (partitionMetadata.partitions > 0) {
+ return CompletableFuture.completedFuture(null).thenAccept(unused -> {
+ final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+ // expire messages for each partition topic
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ TopicName topicNamePartition = topicName.getPartition(i);
+ try {
+ futures.add(pulsar()
+ .getAdminClient()
+ .topics()
+ .expireMessagesAsync(topicNamePartition.toString(),
+ subName, expireTimeInSeconds));
+ } catch (Exception e) {
+ log.error("[{}] Failed to expire messages up to {} on {}",
+ clientAppId(),
+ expireTimeInSeconds, topicNamePartition, e);
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+
+ FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ if (exception != null) {
+ Throwable t = exception.getCause();
+ if (t instanceof NotFoundException) {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ "Subscription not found"));
+ return null;
+ } else {
+ log.error("[{}] Failed to expire messages up "
+ + "to {} on {}", clientAppId(),
+ expireTimeInSeconds, topicName, t);
+ asyncResponse.resume(new RestException(t));
+ return null;
+ }
+ }
+ asyncResponse.resume(Response.noContent().build());
+ return null;
+ });
+ });
+ } else {
+ return internalExpireMessagesByTimestampForSinglePartitionAsync
+ (partitionMetadata, subName, expireTimeInSeconds)
+ .thenAccept(unused ->
+ asyncResponse.resume(Response.noContent().build()));
}
}
+ }))
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
- if (exception != null) {
- Throwable t = exception.getCause();
- if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Subscription not found"));
- return null;
- } else {
- log.error("[{}] Failed to expire messages up to {} on {}",
- clientAppId(), expireTimeInSeconds,
- topicName, t);
- asyncResponse.resume(new RestException(t));
- return null;
- }
- }
- asyncResponse.resume(Response.noContent().build());
- return null;
- });
- });
- } else {
- return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
- subName, expireTimeInSeconds, authoritative)
- .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
- }
- }
- })
).exceptionally(ex -> {
// If the exception is not redirect exception we need to log it.
if (!isRedirectException(ex)) {
@@ -3514,69 +3517,65 @@ public class PersistentTopicsBase extends AdminResource {
}
private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(
- PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds,
- boolean authoritative) {
+ PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds) {
if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
String msg = "This method should not be called for partitioned topic";
return FutureUtil.failedFuture(new IllegalStateException(msg));
} else {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
- validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
- .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
- .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
- if (t == null) {
- resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found"));
- return;
- }
- if (!(t instanceof PersistentTopic)) {
- resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED,
- "Expire messages on a non-persistent topic is not allowed"));
- return;
- }
- PersistentTopic topic = (PersistentTopic) t;
-
- boolean issued;
- if (subName.startsWith(topic.getReplicatorPrefix())) {
- String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
- PersistentReplicator repl = (PersistentReplicator) topic
- .getPersistentReplicator(remoteCluster);
- if (repl == null) {
- resultFuture.completeExceptionally(
- new RestException(Status.NOT_FOUND, "Replicator not found"));
- return;
- }
- issued = repl.expireMessages(expireTimeInSeconds);
- } else {
- PersistentSubscription sub = topic.getSubscription(subName);
- if (sub == null) {
- resultFuture.completeExceptionally(
- new RestException(Status.NOT_FOUND, "Subscription not found"));
- return;
- }
- issued = sub.expireMessages(expireTimeInSeconds);
- }
- if (issued) {
- log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
- expireTimeInSeconds, topicName, subName);
- resultFuture.complete(__);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Expire message by timestamp not issued on topic {} for subscription {} "
- + "due to ongoing message expiration not finished or subscription almost"
- + " catch up. If it's performed on a partitioned topic operation might "
- + "succeeded on other partitions, please check stats of individual "
- + "partition.", topicName, subName);
- }
- resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message "
- + "by timestamp not issued on topic " + topicName + " for subscription "
- + subName + " due to ongoing message expiration not finished or subscription "
- + "almost catch up. If it's performed on a partitioned topic operation might"
- + " succeeded on other partitions, please check stats of individual partition."
- ));
- return;
- }
- })
- ).exceptionally(e -> {
+ getTopicReferenceAsync(topicName).thenAccept(t -> {
+ if (t == null) {
+ resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found"));
+ return;
+ }
+ if (!(t instanceof PersistentTopic)) {
+ resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED,
+ "Expire messages on a non-persistent topic is not allowed"));
+ return;
+ }
+ PersistentTopic topic = (PersistentTopic) t;
+
+ boolean issued;
+ if (subName.startsWith(topic.getReplicatorPrefix())) {
+ String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
+ PersistentReplicator repl = (PersistentReplicator) topic
+ .getPersistentReplicator(remoteCluster);
+ if (repl == null) {
+ resultFuture.completeExceptionally(
+ new RestException(Status.NOT_FOUND, "Replicator not found"));
+ return;
+ }
+ issued = repl.expireMessages(expireTimeInSeconds);
+ } else {
+ PersistentSubscription sub = topic.getSubscription(subName);
+ if (sub == null) {
+ resultFuture.completeExceptionally(
+ new RestException(Status.NOT_FOUND, "Subscription not found"));
+ return;
+ }
+ issued = sub.expireMessages(expireTimeInSeconds);
+ }
+ if (issued) {
+ log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
+ expireTimeInSeconds, topicName, subName);
+ resultFuture.complete(null);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Expire message by timestamp not issued on topic {} for subscription {} "
+ + "due to ongoing message expiration not finished or subscription almost"
+ + " catch up. If it's performed on a partitioned topic operation might "
+ + "succeeded on other partitions, please check stats of individual "
+ + "partition.", topicName, subName);
+ }
+ resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message "
+ + "by timestamp not issued on topic " + topicName + " for subscription "
+ + subName + " due to ongoing message expiration not finished or subscription "
+ + "almost catch up. If it's performed on a partitioned topic operation might"
+ + " succeeded on other partitions, please check stats of individual partition."
+ ));
+ return;
+ }
+ }).exceptionally(e -> {
resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
return null;
});