You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/07 15:22:16 UTC

[GitHub] [pulsar] nicoloboschi opened a new pull request, #17522: [improve][transactions] Add command to list transaction coordinators

nicoloboschi opened a new pull request, #17522:
URL: https://github.com/apache/pulsar/pull/17522

   
   Fixes #17513 
   
   ### Motivation
   At the moment there's no way to list all the transaction coordinators and to understand to which broker own.
   
   ### Modifications
   
   * New endpoint `transactions/coordinators´ to get all the coordinators
      * Under the hood it does a `lookup` to the TC partitioned topic
   * New data object ´TransactionCoordinatorInfo´ -> only contains the `brokerServiceUrl` field
   * Added the equivalent client call in the pulsar-client admin
   * Added the equivalent client call in the pulsar-admin CLI `coordinators-list` 
   
   - [x] `doc-not-needed` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1001660566


##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java:
##########
@@ -34,6 +35,21 @@
 
 public interface Transactions {
 
+    /**
+     * List transaction coordinators.
+     *
+     * @return the transaction coordinators list.
+     */
+    Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException;

Review Comment:
   moved to List



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java:
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * Transaction coordinator information.
+ */
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionCoordinatorInfo {
+    private String brokerServiceUrl;

Review Comment:
   agreed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1001648967


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -102,6 +103,18 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 20000)
+    public void testListTransactionCoordinators() throws Exception {
+        initTransaction(4);
+        final Map<Integer, TransactionCoordinatorInfo> result = admin
+                .transactions().listTransactionCoordinatorsAsync().get();

Review Comment:
   in the `initTransaction` method we wait for all the topic partitions to be loaded
   
   ```
   Awaitility.await().until(() ->
                   pulsar.getTransactionMetadataStoreService().getStores().size() == coordinatorSize);
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -69,6 +71,37 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenAccept(map -> {
+                    if (map.isEmpty()) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));

Review Comment:
   true, removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1005834193


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -68,6 +70,49 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenCompose(map -> {
+                    map.forEach((topicPartition, brokerServiceUrl) -> {
+                        final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
+                        result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
+                    });
+
+                    return getPulsarResources()
+                            .getTopicResources()
+                            .getExistingPartitions(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+                })
+                .thenAccept(allPartitions -> {
+                    allPartitions
+                            .stream()
+                            .filter(partition -> SystemTopicNames
+                                    .isTransactionCoordinatorAssign(TopicName.get(partition)))
+                            .forEach(partition -> {
+                                final int coordinatorId = TopicName.getPartitionIndex(partition);
+                                if (!result.containsKey(coordinatorId)) {
+                                    result.put(coordinatorId,
+                                            new TransactionCoordinatorInfo(coordinatorId, null));
+                                }
+                            });
+                    asyncResponse.resume(result.values());

Review Comment:
   sorry for late response, lookupPartitionedTopicAsync has already return all the partition, it will not missing. are you test it will miss?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#issuecomment-1289441268

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/17522?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#17522](https://codecov.io/gh/apache/pulsar/pull/17522?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (69589d8) into [master](https://codecov.io/gh/apache/pulsar/commit/6c65ca0d8a80bfaaa4d5869e0cea485f5c94369b?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (6c65ca0) will **decrease** coverage by `1.84%`.
   > The diff coverage is `19.51%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/17522/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/17522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #17522      +/-   ##
   ============================================
   - Coverage     34.91%   33.07%   -1.85%     
   + Complexity     5707     5409     -298     
   ============================================
     Files           607      612       +5     
     Lines         53396    53616     +220     
     Branches       5712     5743      +31     
   ============================================
   - Hits          18644    17731     -913     
   - Misses        32119    33352    +1233     
   + Partials       2633     2533     -100     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `33.07% <19.51%> (-1.85%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/17522?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...che/pulsar/broker/admin/impl/TransactionsBase.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi9pbXBsL1RyYW5zYWN0aW9uc0Jhc2UuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../org/apache/pulsar/broker/admin/v2/Namespaces.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi92Mi9OYW1lc3BhY2VzLmphdmE=) | `10.65% <0.00%> (+2.63%)` | :arrow_up: |
   | [...rg/apache/pulsar/broker/admin/v3/Transactions.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9hZG1pbi92My9UcmFuc2FjdGlvbnMuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [.../pulsar/broker/service/AbstractBaseDispatcher.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL0Fic3RyYWN0QmFzZURpc3BhdGNoZXIuamF2YQ==) | `34.35% <ø> (-11.51%)` | :arrow_down: |
   | [.../service/SystemTopicBasedTopicPoliciesService.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1N5c3RlbVRvcGljQmFzZWRUb3BpY1BvbGljaWVzU2VydmljZS5qYXZh) | `54.90% <0.00%> (+3.31%)` | :arrow_up: |
   | [.../pulsar/broker/stats/BrokerOperabilityMetrics.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zdGF0cy9Ccm9rZXJPcGVyYWJpbGl0eU1ldHJpY3MuamF2YQ==) | `98.21% <ø> (+5.56%)` | :arrow_up: |
   | [...g/apache/pulsar/compaction/CompactedTopicImpl.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NvbXBhY3Rpb24vQ29tcGFjdGVkVG9waWNJbXBsLmphdmE=) | `11.42% <0.00%> (+0.71%)` | :arrow_up: |
   | [...va/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWlvL2pkYmMvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2lvL2pkYmMvSmRiY0Fic3RyYWN0U2luay5qYXZh) | `3.22% <0.00%> (-0.97%)` | :arrow_down: |
   | [...java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWlvL2pkYmMvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2lvL2pkYmMvSmRiY1NpbmtDb25maWcuamF2YQ==) | `20.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWlvL2pkYmMvY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2lvL2pkYmMvSmRiY1V0aWxzLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | ... and [126 more](https://codecov.io/gh/apache/pulsar/pull/17522/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi merged pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi merged PR #17522:
URL: https://github.com/apache/pulsar/pull/17522


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi closed pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi closed pull request #17522: [improve][transactions] Add command to list transaction coordinators
URL: https://github.com/apache/pulsar/pull/17522


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1004166381


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -109,6 +110,19 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 20000)
+    public void testListTransactionCoordinators() throws Exception {
+        initTransaction(4);
+        final List<TransactionCoordinatorInfo> result = admin
+                .transactions().listTransactionCoordinatorsAsync().get();
+        System.out.println("result" + result);

Review Comment:
   delete



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -68,6 +70,49 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenCompose(map -> {
+                    map.forEach((topicPartition, brokerServiceUrl) -> {
+                        final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
+                        result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
+                    });
+
+                    return getPulsarResources()
+                            .getTopicResources()
+                            .getExistingPartitions(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+                })
+                .thenAccept(allPartitions -> {
+                    allPartitions
+                            .stream()
+                            .filter(partition -> SystemTopicNames
+                                    .isTransactionCoordinatorAssign(TopicName.get(partition)))
+                            .forEach(partition -> {
+                                final int coordinatorId = TopicName.getPartitionIndex(partition);
+                                if (!result.containsKey(coordinatorId)) {
+                                    result.put(coordinatorId,
+                                            new TransactionCoordinatorInfo(coordinatorId, null));
+                                }
+                            });
+                    asyncResponse.resume(result.values());

Review Comment:
   why do we need to check again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#issuecomment-1244808288

   @nicoloboschi  hi, why not add the broker address into `org.apache.pulsar.common.policies.data.TransactionCoordinatorStats`, in this way, we don't need to add the new interface. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#issuecomment-1286983817

   > @nicoloboschi hi, why not add the broker address into `org.apache.pulsar.common.policies.data.TransactionCoordinatorStats`, in this way, we don't need to add the new interface.
   
   The stats output is not needed here and they're hard to read considering you have multiple coordinators. Adding the related broker to the stat object requires a lookup request. I'd prefer to not add it 
   
   @eolivelli PTAL again
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1004673520


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -68,6 +70,49 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenCompose(map -> {
+                    map.forEach((topicPartition, brokerServiceUrl) -> {
+                        final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
+                        result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
+                    });
+
+                    return getPulsarResources()
+                            .getTopicResources()
+                            .getExistingPartitions(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+                })
+                .thenAccept(allPartitions -> {
+                    allPartitions
+                            .stream()
+                            .filter(partition -> SystemTopicNames
+                                    .isTransactionCoordinatorAssign(TopicName.get(partition)))
+                            .forEach(partition -> {
+                                final int coordinatorId = TopicName.getPartitionIndex(partition);
+                                if (!result.containsKey(coordinatorId)) {
+                                    result.put(coordinatorId,
+                                            new TransactionCoordinatorInfo(coordinatorId, null));
+                                }
+                            });
+                    asyncResponse.resume(result.values());

Review Comment:
   Because there may be partitions not currently loaded by any broker but it's valuable for the user to know that exists 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#issuecomment-1290790453

   @congbobo184 I fixed the debug line and answered to your question


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#issuecomment-1276937493

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1019374458


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -68,6 +70,49 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenCompose(map -> {
+                    map.forEach((topicPartition, brokerServiceUrl) -> {
+                        final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
+                        result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
+                    });
+
+                    return getPulsarResources()
+                            .getTopicResources()
+                            .getExistingPartitions(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
+                })
+                .thenAccept(allPartitions -> {
+                    allPartitions
+                            .stream()
+                            .filter(partition -> SystemTopicNames
+                                    .isTransactionCoordinatorAssign(TopicName.get(partition)))
+                            .forEach(partition -> {
+                                final int coordinatorId = TopicName.getPartitionIndex(partition);
+                                if (!result.containsKey(coordinatorId)) {
+                                    result.put(coordinatorId,
+                                            new TransactionCoordinatorInfo(coordinatorId, null));
+                                }
+                            });
+                    asyncResponse.resume(result.values());

Review Comment:
   you're right. I left only the lookup PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#issuecomment-1289070090

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r965574056


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -69,6 +71,37 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenAccept(map -> {
+                    if (map.isEmpty()) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));

Review Comment:
   This is not actually a "problem" to be reported as 404.
   we can simply return an empty result.
   
   how can we get to this situation ? when the partitions are not loaded by any broker ?



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java:
##########
@@ -34,6 +35,21 @@
 
 public interface Transactions {
 
+    /**
+     * List transaction coordinators.
+     *
+     * @return the transaction coordinators list.
+     */
+    Map<Integer, TransactionCoordinatorInfo> listTransactionCoordinators() throws PulsarAdminException;

Review Comment:
   question (not a request for a change):
   why returning a Map and not a List ?
   I don't think that returning a Map has a particular advantage over returning a List



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -102,6 +103,18 @@ protected void cleanup() throws Exception {
         super.internalCleanup();
     }
 
+    @Test(timeOut = 20000)
+    public void testListTransactionCoordinators() throws Exception {
+        initTransaction(4);
+        final Map<Integer, TransactionCoordinatorInfo> result = admin
+                .transactions().listTransactionCoordinatorsAsync().get();

Review Comment:
   should we use Awaitility here ?
   are we guaranteed that when we reach this point all the partitions are loaded ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -69,6 +71,37 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenAccept(map -> {
+                    if (map.isEmpty()) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+                    map.forEach((topicPartition, brokerServiceUrl) -> {

Review Comment:
   I think that we should return something also for the partitions that are not currently "loaded"



##########
pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorInfo.java:
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+
+/**
+ * Transaction coordinator information.
+ */
+@Getter
+@NoArgsConstructor
+@AllArgsConstructor
+public class TransactionCoordinatorInfo {
+    private String brokerServiceUrl;

Review Comment:
   I would add the id as well, it would make it easier further processing downstream



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nicoloboschi commented on a diff in pull request #17522: [improve][transactions] Add command to list transaction coordinators

Posted by GitBox <gi...@apache.org>.
nicoloboschi commented on code in PR #17522:
URL: https://github.com/apache/pulsar/pull/17522#discussion_r1001665702


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -69,6 +71,37 @@
 @Slf4j
 public abstract class TransactionsBase extends AdminResource {
 
+    protected void internalListCoordinators(AsyncResponse asyncResponse) {
+        final PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (PulsarServerException ex) {
+            asyncResponse.resume(new RestException(ex));
+            return;
+        }
+        admin.lookups()
+                .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
+                .thenAccept(map -> {
+                    if (map.isEmpty()) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
+                    map.forEach((topicPartition, brokerServiceUrl) -> {

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org