You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/04 10:36:29 UTC

[GitHub] [pulsar] AnonHxy opened a new pull request, #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

AnonHxy opened a new pull request, #16946:
URL: https://github.com/apache/pulsar/pull/16946

   ### Motivation
   
   
   * Reduce local REST calll for `org.apache.pulsar.broker.admin.impl.PersistentTopicsBase#internalGetReplicatedSubscriptionStatus`
   
   ### Modifications
   
   * Call local method if partition is owned by this broker,  or call REST api usng internal `AdminClient`
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [x] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#issuecomment-1205930084

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r958331532


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5233,42 +5241,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromBroker(TopicName partitionName,
+                                                                                              String subName) {
+        return getTopicReferenceAsync(partitionName).thenCompose(topic -> {
+            Subscription sub = topic.getSubscription(subName);
+            if (sub == null) {
+                return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    getSubNotFoundErrorMessage(partitionName.toString(), subName)));
+            }
+            if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
+                Map<String, Boolean> res = Maps.newHashMap();
+                res.put(partitionName.toString(), sub.isReplicated());
+                return CompletableFuture.completedFuture(res);
+            } else {
+                return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot get replicated subscriptions on non-persistent topics"));
+            }
+        });
+    }
+
     private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
                                                                                AsyncResponse asyncResponse,
                                                                                String subName,
                                                                                boolean authoritative) {
         // Redirect the request to the appropriate broker if this broker is not the owner of the topic
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenAccept(topic -> {
-                    if (topic == null) {

Review Comment:
   `getTopicReferenceAsync(topicName))`  cannot return `null`,  so here we no need to check  `topic == null` I think.
   
   If topic doesn't exist, `getTopicReferenceAsync` will return `completeFuture` with  exception result, see Line4312
   https://github.com/apache/pulsar/blob/cb881d47f7e5c561ac0e2c76e525b8b5a8a9aaef/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java#L4308-L4312



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy merged pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy merged PR #16946:
URL: https://github.com/apache/pulsar/pull/16946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r958332918


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5233,42 +5241,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromBroker(TopicName partitionName,

Review Comment:
   Update to `getReplicatedSubscriptionStatusFromLocalBroker `



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r956598776


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
                     .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                     .thenAccept(partitionMetadata -> {
                         if (partitionMetadata.partitions > 0) {
-                            final List<CompletableFuture<Map<String, Boolean>>> futures =
-                                    Lists.newArrayListWithCapacity(partitionMetadata.partitions);
-                            final Map<String, Boolean> status = Maps.newHashMap();
+                            List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
+                            Map<String, Boolean> status = Maps.newHashMap();
 
                             for (int i = 0; i < partitionMetadata.partitions; i++) {
                                 TopicName partition = topicName.getPartition(i);
-                                try {
-                                    futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                            partition.toString(), subName).whenComplete((response, throwable) -> {
-                                        if (throwable != null) {
-                                            log.error("[{}] Failed to get replicated subscriptions on {} {}",
-                                                    clientAppId(), partition, subName, throwable);
-                                            asyncResponse.resume(new RestException(throwable));
+                                futures.add(
+                                    pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                    .thenCompose(owned -> {
+                                        if (owned) {
+                                            // if this broker owned the partition do action like
+                                            // `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
+                                            return getTopicReferenceAsync(partition)
+                                                .thenApply(topic -> {

Review Comment:
   Good suggestion. 
   
   We can abstract the common code to a method.  I have updted it. PTAL  @Jason918 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#issuecomment-1232466222

   @codelipenghui @Technoboy-  PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r940038526


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
                     .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                     .thenAccept(partitionMetadata -> {
                         if (partitionMetadata.partitions > 0) {
-                            final List<CompletableFuture<Map<String, Boolean>>> futures =
-                                    Lists.newArrayListWithCapacity(partitionMetadata.partitions);
-                            final Map<String, Boolean> status = Maps.newHashMap();
+                            List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
+                            Map<String, Boolean> status = Maps.newHashMap();
 
                             for (int i = 0; i < partitionMetadata.partitions; i++) {
                                 TopicName partition = topicName.getPartition(i);
-                                try {
-                                    futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                            partition.toString(), subName).whenComplete((response, throwable) -> {
-                                        if (throwable != null) {
-                                            log.error("[{}] Failed to get replicated subscriptions on {} {}",
-                                                    clientAppId(), partition, subName, throwable);
-                                            asyncResponse.resume(new RestException(throwable));
+                                futures.add(
+                                    pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                    .thenCompose(owned -> {
+                                        if (owned) {
+                                            // if this broker owned the partition do action like
+                                            // `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
+                                            return getTopicReferenceAsync(partition)
+                                                .thenApply(topic -> {

Review Comment:
   We might get RestException here if the broker does not have the topic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#issuecomment-1229518061

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r940903736


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
                     .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                     .thenAccept(partitionMetadata -> {
                         if (partitionMetadata.partitions > 0) {
-                            final List<CompletableFuture<Map<String, Boolean>>> futures =
-                                    Lists.newArrayListWithCapacity(partitionMetadata.partitions);
-                            final Map<String, Boolean> status = Maps.newHashMap();
+                            List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
+                            Map<String, Boolean> status = Maps.newHashMap();
 
                             for (int i = 0; i < partitionMetadata.partitions; i++) {
                                 TopicName partition = topicName.getPartition(i);
-                                try {
-                                    futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                            partition.toString(), subName).whenComplete((response, throwable) -> {
-                                        if (throwable != null) {
-                                            log.error("[{}] Failed to get replicated subscriptions on {} {}",
-                                                    clientAppId(), partition, subName, throwable);
-                                            asyncResponse.resume(new RestException(throwable));
+                                futures.add(
+                                    pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                    .thenCompose(owned -> {
+                                        if (owned) {
+                                            // if this broker owned the partition do action like
+                                            // `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
+                                            return getTopicReferenceAsync(partition)
+                                                .thenApply(topic -> {

Review Comment:
   Yes. But `FutureUtil.waitForAll(futures)` on line5195 will handle this exception @codelipenghui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r956598776


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
                     .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                     .thenAccept(partitionMetadata -> {
                         if (partitionMetadata.partitions > 0) {
-                            final List<CompletableFuture<Map<String, Boolean>>> futures =
-                                    Lists.newArrayListWithCapacity(partitionMetadata.partitions);
-                            final Map<String, Boolean> status = Maps.newHashMap();
+                            List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
+                            Map<String, Boolean> status = Maps.newHashMap();
 
                             for (int i = 0; i < partitionMetadata.partitions; i++) {
                                 TopicName partition = topicName.getPartition(i);
-                                try {
-                                    futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                            partition.toString(), subName).whenComplete((response, throwable) -> {
-                                        if (throwable != null) {
-                                            log.error("[{}] Failed to get replicated subscriptions on {} {}",
-                                                    clientAppId(), partition, subName, throwable);
-                                            asyncResponse.resume(new RestException(throwable));
+                                futures.add(
+                                    pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                    .thenCompose(owned -> {
+                                        if (owned) {
+                                            // if this broker owned the partition do action like
+                                            // `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
+                                            return getTopicReferenceAsync(partition)
+                                                .thenApply(topic -> {

Review Comment:
   Good suggestion. 
   
   But before do this, we need more work to refactor this method. Because we need a `CompleteFuture` here and `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic` return type is void.
   
   I will work on it 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r958332687


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5233,42 +5241,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromBroker(TopicName partitionName,

Review Comment:
   Changed  to `localTopicName`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r958088114


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5233,42 +5241,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromBroker(TopicName partitionName,
+                                                                                              String subName) {
+        return getTopicReferenceAsync(partitionName).thenCompose(topic -> {
+            Subscription sub = topic.getSubscription(subName);
+            if (sub == null) {
+                return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    getSubNotFoundErrorMessage(partitionName.toString(), subName)));
+            }
+            if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
+                Map<String, Boolean> res = Maps.newHashMap();
+                res.put(partitionName.toString(), sub.isReplicated());
+                return CompletableFuture.completedFuture(res);
+            } else {
+                return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
+                    "Cannot get replicated subscriptions on non-persistent topics"));
+            }
+        });
+    }
+
     private void internalGetReplicatedSubscriptionStatusForNonPartitionedTopic(
                                                                                AsyncResponse asyncResponse,
                                                                                String subName,
                                                                                boolean authoritative) {
         // Redirect the request to the appropriate broker if this broker is not the owner of the topic
         validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> getTopicReferenceAsync(topicName))
-                .thenAccept(topic -> {
-                    if (topic == null) {

Review Comment:
   Better not remove this check?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5233,42 +5241,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromBroker(TopicName partitionName,

Review Comment:
   `getReplicatedSubscriptionStatusFromBroker` --> `getReplicatedSubscriptionStatusFromLocalBroker` or `getLocalReplicatedSubscriptionStatus` ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5233,42 +5241,41 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromBroker(TopicName partitionName,

Review Comment:
   `partitionName` here is confusing. It also applies to non-partitioned topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r956598776


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
                     .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                     .thenAccept(partitionMetadata -> {
                         if (partitionMetadata.partitions > 0) {
-                            final List<CompletableFuture<Map<String, Boolean>>> futures =
-                                    Lists.newArrayListWithCapacity(partitionMetadata.partitions);
-                            final Map<String, Boolean> status = Maps.newHashMap();
+                            List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
+                            Map<String, Boolean> status = Maps.newHashMap();
 
                             for (int i = 0; i < partitionMetadata.partitions; i++) {
                                 TopicName partition = topicName.getPartition(i);
-                                try {
-                                    futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                            partition.toString(), subName).whenComplete((response, throwable) -> {
-                                        if (throwable != null) {
-                                            log.error("[{}] Failed to get replicated subscriptions on {} {}",
-                                                    clientAppId(), partition, subName, throwable);
-                                            asyncResponse.resume(new RestException(throwable));
+                                futures.add(
+                                    pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                    .thenCompose(owned -> {
+                                        if (owned) {
+                                            // if this broker owned the partition do action like
+                                            // `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
+                                            return getTopicReferenceAsync(partition)
+                                                .thenApply(topic -> {

Review Comment:
   Good suggestion. 
   
   We can abstract the common code a method.  I have updted it. PTAL  @Jason918 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r969078098


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5250,42 +5258,42 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromLocalBroker(
+            TopicName localTopicName,
+            String subName) {
+        return getTopicReferenceAsync(localTopicName).thenCompose(topic -> {
+            Subscription sub = topic.getSubscription(subName);
+            if (sub == null) {
+                return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    getSubNotFoundErrorMessage(localTopicName.toString(), subName)));
+            }
+            if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
+                Map<String, Boolean> res = Maps.newHashMap();
+                res.put(localTopicName.toString(), sub.isReplicated());
+                return CompletableFuture.completedFuture(res);

Review Comment:
   Update and added UT



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy closed pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy closed pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call
URL: https://github.com/apache/pulsar/pull/16946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r956560987


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5145,27 +5145,54 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
                     .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                     .thenAccept(partitionMetadata -> {
                         if (partitionMetadata.partitions > 0) {
-                            final List<CompletableFuture<Map<String, Boolean>>> futures =
-                                    Lists.newArrayListWithCapacity(partitionMetadata.partitions);
-                            final Map<String, Boolean> status = Maps.newHashMap();
+                            List<CompletableFuture<Void>> futures = new ArrayList<>(partitionMetadata.partitions);
+                            Map<String, Boolean> status = Maps.newHashMap();
 
                             for (int i = 0; i < partitionMetadata.partitions; i++) {
                                 TopicName partition = topicName.getPartition(i);
-                                try {
-                                    futures.add(pulsar().getAdminClient().topics().getReplicatedSubscriptionStatusAsync(
-                                            partition.toString(), subName).whenComplete((response, throwable) -> {
-                                        if (throwable != null) {
-                                            log.error("[{}] Failed to get replicated subscriptions on {} {}",
-                                                    clientAppId(), partition, subName, throwable);
-                                            asyncResponse.resume(new RestException(throwable));
+                                futures.add(
+                                    pulsar().getNamespaceService().isServiceUnitOwnedAsync(partition)
+                                    .thenCompose(owned -> {
+                                        if (owned) {
+                                            // if this broker owned the partition do action like
+                                            // `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic()`
+                                            return getTopicReferenceAsync(partition)
+                                                .thenApply(topic -> {

Review Comment:
   Can we reuse the codes in `internalGetReplicatedSubscriptionStatusForNonPartitionedTopic`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#issuecomment-1232465968

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] AnonHxy commented on pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
AnonHxy commented on PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#issuecomment-1230100927

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #16946: [Improve][Broker]Reduce GetReplicatedSubscriptionStatus local REST call

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #16946:
URL: https://github.com/apache/pulsar/pull/16946#discussion_r965955373


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java:
##########
@@ -5250,42 +5258,42 @@ protected void internalGetReplicatedSubscriptionStatus(AsyncResponse asyncRespon
         });
     }
 
+    private CompletableFuture<Map<String, Boolean>> getReplicatedSubscriptionStatusFromLocalBroker(
+            TopicName localTopicName,
+            String subName) {
+        return getTopicReferenceAsync(localTopicName).thenCompose(topic -> {
+            Subscription sub = topic.getSubscription(subName);
+            if (sub == null) {
+                return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    getSubNotFoundErrorMessage(localTopicName.toString(), subName)));
+            }
+            if (topic instanceof PersistentTopic && sub instanceof PersistentSubscription) {
+                Map<String, Boolean> res = Maps.newHashMap();
+                res.put(localTopicName.toString(), sub.isReplicated());
+                return CompletableFuture.completedFuture(res);

Review Comment:
   ```suggestion
                  return CompletableFuture.completedFuture(Collections.singletonMap(localTopicName.toString(), 
                           sub.isReplicated()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org