You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/04/21 01:59:46 UTC

[pulsar] branch master updated: Fix duplicate validateTopicOwnershipAsync (#15120)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 151f1d1d3e1 Fix duplicate validateTopicOwnershipAsync (#15120)
151f1d1d3e1 is described below

commit 151f1d1d3e14df9166547d1aed829c774ccce99d
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Thu Apr 21 09:59:37 2022 +0800

    Fix duplicate validateTopicOwnershipAsync (#15120)
    
    Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 259 ++++++++++-----------
 1 file changed, 129 insertions(+), 130 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index eb8d9bcc513..2a4be06f418 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -1045,11 +1045,13 @@ public class PersistentTopicsBase extends AdminResource {
         } else {
             future = CompletableFuture.completedFuture(null);
         }
-        future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
-                .thenAccept(unused -> {
+        future.thenCompose(__ ->
+                validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)
+                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenAccept(unused1 -> {
                     // If the topic name is a partition name, no need to get partition topic metadata again
                     if (topicName.isPartitioned()) {
-                        internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
+                        internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
                     } else {
                         getPartitionedTopicMetadataAsync(topicName, authoritative, false)
                                 .thenAccept(partitionMetadata -> {
@@ -1067,7 +1069,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                     topicResources().persistentTopicExists(topicName.getPartition(i)));
                                         }
                                         FutureUtil.waitForAll(Lists.newArrayList(existsFutures.values()))
-                                                .thenApply(__ ->
+                                                .thenApply(unused2 ->
                                                 existsFutures.entrySet().stream().filter(e -> e.getValue().join())
                                                         .map(item -> topicName.getPartition(item.getKey()).toString())
                                                         .collect(Collectors.toList())
@@ -1086,7 +1088,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                     throw new RestException(e);
                                                 }
                                             });
-                                        }).thenAccept(__ -> resumeAsyncResponse(asyncResponse,
+                                        }).thenAccept(unused3 -> resumeAsyncResponse(asyncResponse,
                                                         subscriptions, subscriptionFutures));
                                     } else {
                                         for (int i = 0; i < partitionMetadata.partitions; i++) {
@@ -1104,7 +1106,7 @@ public class PersistentTopicsBase extends AdminResource {
                                     asyncResponse.resume(e);
                                 }
                             } else {
-                                internalGetSubscriptionsForNonPartitionedTopic(asyncResponse, authoritative);
+                                internalGetSubscriptionsForNonPartitionedTopic(asyncResponse);
                             }
                         }).exceptionally(ex -> {
                             // If the exception is not redirect exception we need to log it.
@@ -1124,7 +1126,8 @@ public class PersistentTopicsBase extends AdminResource {
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
-                });
+                })
+        );
     }
 
     private void resumeAsyncResponse(AsyncResponse asyncResponse, Set<String> subscriptions,
@@ -1153,10 +1156,8 @@ public class PersistentTopicsBase extends AdminResource {
         });
     }
 
-    private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse, boolean authoritative) {
-        validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
+    private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncResponse) {
+        getTopicReferenceAsync(topicName)
                 .thenAccept(topic -> asyncResponse.resume(Lists.newArrayList(topic.getSubscriptions().keys())))
                 .exceptionally(ex -> {
                     // If the exception is not redirect exception we need to log it.
@@ -1731,11 +1732,7 @@ public class PersistentTopicsBase extends AdminResource {
     private CompletableFuture<Void> internalSkipAllMessagesForNonPartitionedTopicAsync(AsyncResponse asyncResponse,
                                                                                        String subName,
                                                                                        boolean authoritative) {
-        return validateTopicOwnershipAsync(topicName, authoritative)
-            .thenCompose(__ ->
-                validateTopicOperationAsync(topicName, TopicOperation.SKIP, subName))
-            .thenCompose(__ ->
-                getTopicReferenceAsync(topicName).thenCompose(t -> {
+        return getTopicReferenceAsync(topicName).thenCompose(t -> {
                     PersistentTopic topic = (PersistentTopic) t;
                     BiConsumer<Void, Throwable> biConsumer = (v, ex) -> {
                         if (ex != null) {
@@ -1764,8 +1761,7 @@ public class PersistentTopicsBase extends AdminResource {
                         }
                         return sub.clearBacklog().whenComplete(biConsumer);
                     }
-                })
-                .exceptionally(ex -> {
+                }).exceptionally(ex -> {
                     // If the exception is not redirect exception we need to log it.
                     if (!isRedirectException(ex)) {
                         log.error("[{}] Failed to skip all messages for subscription {} on topic {}",
@@ -1773,7 +1769,7 @@ public class PersistentTopicsBase extends AdminResource {
                     }
                     resumeAsyncResponseExceptionally(asyncResponse, ex);
                     return null;
-                }));
+                });
     }
 
     protected void internalSkipMessages(AsyncResponse asyncResponse, String subName, int numMessages,
@@ -1936,7 +1932,7 @@ public class PersistentTopicsBase extends AdminResource {
                     for (int i = 0; i < subNames.size(); i++) {
                         try {
                             futures.add(internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
-                                    subNames.get(i), expireTimeInSeconds, authoritative));
+                                    subNames.get(i), expireTimeInSeconds));
                         } catch (Exception e) {
                             log.error("[{}] Failed to expire messages for all subscription up to {} on {}",
                                     clientAppId(), expireTimeInSeconds, topicName, e);
@@ -3447,61 +3443,68 @@ public class PersistentTopicsBase extends AdminResource {
             future = CompletableFuture.completedFuture(null);
         }
         future.thenCompose(__ ->
-            // If the topic name is a partition name, no need to get partition topic metadata again
-            getPartitionedTopicMetadataAsync(topicName, authoritative, false)
-                    .thenCompose(partitionMetadata -> {
-                        if (topicName.isPartitioned()) {
-                            return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata, subName,
-                                    expireTimeInSeconds, authoritative)
-                                    .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
-                        } else {
-                            if (partitionMetadata.partitions > 0) {
-                                return CompletableFuture.completedFuture(null).thenAccept(unused -> {
-                                    final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
-                                    // expire messages for each partition topic
-                                    for (int i = 0; i < partitionMetadata.partitions; i++) {
-                                        TopicName topicNamePartition = topicName.getPartition(i);
-                                        try {
-                                            futures.add(pulsar()
-                                                    .getAdminClient()
-                                                    .topics()
-                                                    .expireMessagesAsync(topicNamePartition.toString(),
-                                                            subName, expireTimeInSeconds));
-                                        } catch (Exception e) {
-                                            log.error("[{}] Failed to expire messages up to {} on {}", clientAppId(),
-                                                    expireTimeInSeconds, topicNamePartition, e);
-                                            asyncResponse.resume(new RestException(e));
-                                            return;
+            validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
+                .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
+                .thenCompose(unused2 ->
+                        // If the topic name is a partition name, no need to get partition topic metadata again
+                        getPartitionedTopicMetadataAsync(topicName, authoritative, false)
+                                .thenCompose(partitionMetadata -> {
+                                    if (topicName.isPartitioned()) {
+                                        return internalExpireMessagesByTimestampForSinglePartitionAsync
+                                                (partitionMetadata, subName, expireTimeInSeconds)
+                                                .thenAccept(unused3 ->
+                                                        asyncResponse.resume(Response.noContent().build()));
+                                    } else {
+                                        if (partitionMetadata.partitions > 0) {
+                                            return CompletableFuture.completedFuture(null).thenAccept(unused -> {
+                                                final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+
+                                                // expire messages for each partition topic
+                                                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                                                    TopicName topicNamePartition = topicName.getPartition(i);
+                                                    try {
+                                                        futures.add(pulsar()
+                                                                .getAdminClient()
+                                                                .topics()
+                                                                .expireMessagesAsync(topicNamePartition.toString(),
+                                                                        subName, expireTimeInSeconds));
+                                                    } catch (Exception e) {
+                                                        log.error("[{}] Failed to expire messages up to {} on {}",
+                                                                clientAppId(),
+                                                                expireTimeInSeconds, topicNamePartition, e);
+                                                        asyncResponse.resume(new RestException(e));
+                                                        return;
+                                                    }
+                                                }
+
+                                                FutureUtil.waitForAll(futures).handle((result, exception) -> {
+                                                    if (exception != null) {
+                                                        Throwable t = exception.getCause();
+                                                        if (t instanceof NotFoundException) {
+                                                            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                                                    "Subscription not found"));
+                                                            return null;
+                                                        } else {
+                                                            log.error("[{}] Failed to expire messages up "
+                                                                            + "to {} on {}", clientAppId(),
+                                                                    expireTimeInSeconds, topicName, t);
+                                                            asyncResponse.resume(new RestException(t));
+                                                            return null;
+                                                        }
+                                                    }
+                                                    asyncResponse.resume(Response.noContent().build());
+                                                    return null;
+                                                });
+                                            });
+                                        } else {
+                                            return internalExpireMessagesByTimestampForSinglePartitionAsync
+                                                    (partitionMetadata, subName, expireTimeInSeconds)
+                                                    .thenAccept(unused ->
+                                                            asyncResponse.resume(Response.noContent().build()));
                                         }
                                     }
+                                }))
 
-                                    FutureUtil.waitForAll(futures).handle((result, exception) -> {
-                                        if (exception != null) {
-                                            Throwable t = exception.getCause();
-                                            if (t instanceof NotFoundException) {
-                                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                                                        "Subscription not found"));
-                                                return null;
-                                            } else {
-                                                log.error("[{}] Failed to expire messages up to {} on {}",
-                                                        clientAppId(), expireTimeInSeconds,
-                                                        topicName, t);
-                                                asyncResponse.resume(new RestException(t));
-                                                return null;
-                                            }
-                                        }
-                                        asyncResponse.resume(Response.noContent().build());
-                                        return null;
-                                    });
-                                });
-                            } else {
-                                return internalExpireMessagesByTimestampForSinglePartitionAsync(partitionMetadata,
-                                        subName, expireTimeInSeconds, authoritative)
-                                        .thenAccept(unused -> asyncResponse.resume(Response.noContent().build()));
-                            }
-                        }
-                    })
         ).exceptionally(ex -> {
             // If the exception is not redirect exception we need to log it.
             if (!isRedirectException(ex)) {
@@ -3514,69 +3517,65 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     private CompletableFuture<Void> internalExpireMessagesByTimestampForSinglePartitionAsync(
-            PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds,
-            boolean authoritative) {
+            PartitionedTopicMetadata partitionMetadata, String subName, int expireTimeInSeconds) {
         if (!topicName.isPartitioned() && partitionMetadata.partitions > 0) {
             String msg = "This method should not be called for partitioned topic";
             return FutureUtil.failedFuture(new IllegalStateException(msg));
         } else {
             final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
-            validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
-                    .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
-                    .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
-                         if (t == null) {
-                             resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found"));
-                             return;
-                         }
-                        if (!(t instanceof PersistentTopic)) {
-                            resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED,
-                                    "Expire messages on a non-persistent topic is not allowed"));
-                            return;
-                        }
-                        PersistentTopic topic = (PersistentTopic) t;
-
-                        boolean issued;
-                        if (subName.startsWith(topic.getReplicatorPrefix())) {
-                            String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
-                            PersistentReplicator repl = (PersistentReplicator) topic
-                                    .getPersistentReplicator(remoteCluster);
-                            if (repl == null) {
-                                resultFuture.completeExceptionally(
-                                        new RestException(Status.NOT_FOUND, "Replicator not found"));
-                                return;
-                            }
-                            issued = repl.expireMessages(expireTimeInSeconds);
-                        } else {
-                            PersistentSubscription sub = topic.getSubscription(subName);
-                            if (sub == null) {
-                                resultFuture.completeExceptionally(
-                                        new RestException(Status.NOT_FOUND, "Subscription not found"));
-                                return;
-                            }
-                            issued = sub.expireMessages(expireTimeInSeconds);
-                        }
-                        if (issued) {
-                            log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
-                                    expireTimeInSeconds, topicName, subName);
-                            resultFuture.complete(__);
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug("Expire message by timestamp not issued on topic {} for subscription {} "
-                                        + "due to ongoing message expiration not finished or subscription almost"
-                                        + " catch up. If it's performed on a partitioned topic operation might "
-                                        + "succeeded on other partitions, please check stats of individual "
-                                        + "partition.", topicName, subName);
-                            }
-                            resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message "
-                                    + "by timestamp not issued on topic " + topicName + " for subscription "
-                                    + subName + " due to ongoing message expiration not finished or subscription "
-                                    + "almost catch  up. If it's performed on a partitioned topic operation might"
-                                    + " succeeded on other partitions, please check stats of individual partition."
-                            ));
-                            return;
-                        }
-                            })
-                    ).exceptionally(e -> {
+            getTopicReferenceAsync(topicName).thenAccept(t -> {
+                 if (t == null) {
+                     resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found"));
+                     return;
+                 }
+                if (!(t instanceof PersistentTopic)) {
+                    resultFuture.completeExceptionally(new RestException(Status.METHOD_NOT_ALLOWED,
+                            "Expire messages on a non-persistent topic is not allowed"));
+                    return;
+                }
+                PersistentTopic topic = (PersistentTopic) t;
+
+                boolean issued;
+                if (subName.startsWith(topic.getReplicatorPrefix())) {
+                    String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
+                    PersistentReplicator repl = (PersistentReplicator) topic
+                            .getPersistentReplicator(remoteCluster);
+                    if (repl == null) {
+                        resultFuture.completeExceptionally(
+                                new RestException(Status.NOT_FOUND, "Replicator not found"));
+                        return;
+                    }
+                    issued = repl.expireMessages(expireTimeInSeconds);
+                } else {
+                    PersistentSubscription sub = topic.getSubscription(subName);
+                    if (sub == null) {
+                        resultFuture.completeExceptionally(
+                                new RestException(Status.NOT_FOUND, "Subscription not found"));
+                        return;
+                    }
+                    issued = sub.expireMessages(expireTimeInSeconds);
+                }
+                if (issued) {
+                    log.info("[{}] Message expire started up to {} on {} {}", clientAppId(),
+                            expireTimeInSeconds, topicName, subName);
+                    resultFuture.complete(null);
+                } else {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Expire message by timestamp not issued on topic {} for subscription {} "
+                                + "due to ongoing message expiration not finished or subscription almost"
+                                + " catch up. If it's performed on a partitioned topic operation might "
+                                + "succeeded on other partitions, please check stats of individual "
+                                + "partition.", topicName, subName);
+                    }
+                    resultFuture.completeExceptionally(new RestException(Status.CONFLICT, "Expire message "
+                            + "by timestamp not issued on topic " + topicName + " for subscription "
+                            + subName + " due to ongoing message expiration not finished or subscription "
+                            + "almost catch  up. If it's performed on a partitioned topic operation might"
+                            + " succeeded on other partitions, please check stats of individual partition."
+                    ));
+                    return;
+                }
+            }).exceptionally(e -> {
                 resultFuture.completeExceptionally(FutureUtil.unwrapCompletionException(e));
                 return null;
             });