You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/22 11:46:19 UTC
[pulsar] branch master updated: Put `validateTopicOwnershipAsync` before `validateTopicOperationAsync` (#15265)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 41f40f06c4c Put `validateTopicOwnershipAsync` before `validateTopicOperationAsync` (#15265)
41f40f06c4c is described below
commit 41f40f06c4c4d74939bca07a9b83bda020147346
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Apr 22 19:46:11 2022 +0800
Put `validateTopicOwnershipAsync` before `validateTopicOperationAsync` (#15265)
---
.../broker/admin/impl/PersistentTopicsBase.java | 33 +++++++++++-----------
1 file changed, 17 insertions(+), 16 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2a4be06f418..addf02f8005 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -578,8 +578,9 @@ public class PersistentTopicsBase extends AdminResource {
protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative,
boolean force, boolean deleteSchema) {
- validateNamespaceOperationAsync(topicName.getNamespaceObject(), NamespaceOperation.DELETE_TOPIC)
- .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+ NamespaceOperation.DELETE_TOPIC))
.thenCompose(__ -> pulsar().getBrokerService()
.fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(partitionedMeta -> {
@@ -969,8 +970,8 @@ public class PersistentTopicsBase extends AdminResource {
}
private void internalUnloadNonPartitionedTopicAsync(AsyncResponse asyncResponse, boolean authoritative) {
- validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
- .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative)
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenCompose(topic -> topic.close(false))
.thenRun(() -> {
@@ -988,8 +989,8 @@ public class PersistentTopicsBase extends AdminResource {
}
private void internalUnloadTransactionCoordinatorAsync(AsyncResponse asyncResponse, boolean authoritative) {
- validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
- .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative)
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.UNLOAD)
.thenCompose(v -> pulsar()
.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(
@@ -1046,8 +1047,8 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
- validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS)
- .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.GET_SUBSCRIPTIONS))
.thenAccept(unused1 -> {
// If the topic name is a partition name, no need to get partition topic metadata again
if (topicName.isPartitioned()) {
@@ -1780,8 +1781,8 @@ public class PersistentTopicsBase extends AdminResource {
} else {
future = CompletableFuture.completedFuture(null);
}
- future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
- .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.SKIP))
.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false)
.thenCompose(partitionMetadata -> {
if (partitionMetadata.partitions > 0) {
@@ -1908,8 +1909,8 @@ public class PersistentTopicsBase extends AdminResource {
int expireTimeInSeconds,
boolean authoritative) {
// validate ownership and redirect if current broker is not owner
- validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
- .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
if (t == null) {
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND,
@@ -3443,8 +3444,8 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
future.thenCompose(__ ->
- validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES)
- .thenCompose(unused -> validateTopicOwnershipAsync(topicName, authoritative))
+ validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(unused -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(unused2 ->
// If the topic name is a partition name, no need to get partition topic metadata again
getPartitionedTopicMetadataAsync(topicName, authoritative, false)
@@ -3592,8 +3593,8 @@ public class PersistentTopicsBase extends AdminResource {
future = CompletableFuture.completedFuture(null);
}
- future.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
- .thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ future.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+ .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.EXPIRE_MESSAGES))
.thenCompose(__ -> {
log.info("[{}][{}] received expire messages on subscription {} to position {}", clientAppId(),
topicName, subName, messageId);