You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/04/04 03:11:11 UTC

[GitHub] [pulsar] Technoboy- opened a new pull request, #15017: [fix][transaction] Fix transaction REST API redirect issue.

Technoboy- opened a new pull request, #15017:
URL: https://github.com/apache/pulsar/pull/15017

   ### Motivation
   
   It's the same issue with patch #14876 fixed.
   
   When topic does not exist, the original logic will return `TEMPORARY_REDIRECT`, it will cause the client below exception : 
   
   ```
   Caused by: java.lang.NullPointerException: originalUrl
   	at org.asynchttpclient.util.Assertions.assertNotNull(Assertions.java:23)
   	at org.asynchttpclient.uri.UriParser.parse(UriParser.java:344)
   	at org.asynchttpclient.uri.Uri.create(Uri.java:67)
   	at org.asynchttpclient.netty.handler.intercept.Redirect30xInterceptor.exitAfterHandlingRedirect(Redirect30xInterceptor.java:128)
   	at org.asynchttpclient.netty.handler.intercept.Interceptors.exitAfterIntercept(Interceptors.java:98)
   	at org.asynchttpclient.netty.handler.HttpHandler.handleHttpResponse(HttpHandler.java:78)
   	at org.asynchttpclient.netty.handler.HttpHandler.handleRead(HttpHandler.java:140)
   ```
   
   ### Modifications
   
   - Add `checkTransactionCoordinatorEnabled`.
   - Return `NOT_FOUND` instead of `TEMPORARY_REDIRECT`.
   - Make `internalGetTransactionInPendingAckStats` fully async.
   - Make `internalGetTransactionInBufferStats` fully async.
   - Make `internalGetTransactionBufferStats` fully async.
   - Make `internalGetPendingAckStats` fully async.
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   
   ### Documentation
   
     
   - [x] `no-need-doc` 
     
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843896353


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   but this exception will cause the ``AsyncResponse`` is not to complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- merged pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- merged PR #15017:
URL: https://github.com/apache/pulsar/pull/15017


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843898286


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
     protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
                                                        boolean metadata, int coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
-                validateTopicOwnership(topicName, authoritative);
-                TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
-                        .getStores().get(TransactionCoordinatorID.get(coordinatorId));
-                if (metadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
-                    return;
-                }
-                if (metadataStore instanceof MLTransactionMetadataStore) {
-                    ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
-                    TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
-                            new TransactionCoordinatorInternalStats();
-                    TransactionLogStats transactionLogStats = new TransactionLogStats();
-                    transactionLogStats.managedLedgerName = managedLedger.getName();
-                    transactionLogStats.managedLedgerInternalStats =
-                            managedLedger.getManagedLedgerInternalStats(metadata).get();
-                    transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
-                    asyncResponse.resume(transactionCoordinatorInternalStats);
-                } else {
-                    asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                            "Broker don't use MLTransactionMetadataStore!"));
-                }
+            TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            validateTopicOwnership(topicName, authoritative);
+            TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
+                    .getStores().get(TransactionCoordinatorID.get(coordinatorId));
+            if (metadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            if (metadataStore instanceof MLTransactionMetadataStore) {
+                ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
+                TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
+                        new TransactionCoordinatorInternalStats();
+                TransactionLogStats transactionLogStats = new TransactionLogStats();
+                transactionLogStats.managedLedgerName = managedLedger.getName();
+                transactionLogStats.managedLedgerInternalStats =
+                        managedLedger.getManagedLedgerInternalStats(metadata).get();

Review Comment:
   In this patch, only remove `pulsar().getConfig().isTransactionCoordinatorEnabled()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843888728


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -205,6 +248,7 @@ public void getCoordinatorInternalStats(@Suspended final AsyncResponse asyncResp
                                             @DefaultValue("false") boolean authoritative,
                                             @PathParam("coordinatorId") String coordinatorId,
                                             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -188,6 +230,7 @@ public void getSlowTransactions(@Suspended final AsyncResponse asyncResponse,
                                     @DefaultValue("false") boolean authoritative,
                                     @PathParam("timeout") String timeout,
                                     @QueryParam("coordinatorId") Integer coordinatorId) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843877969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());

Review Comment:
   use ``join`` to avoid catch checked exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843872907


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   Looks like we should check this method exception or make it async.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r841339916


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,144 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
 
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
-
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
             CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
                     .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
+            if (topicFuture == null) {
+                return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
             }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+            return topicFuture.thenCompose(optionalTopic -> {
+                if (!optionalTopic.isPresent()) {
+                    return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                }
+                return CompletableFuture.completedFuture(((PersistentTopic) optionalTopic.get())

Review Comment:
   `PersistentTopic` check is removed here. Any reason?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,144 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
 
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
-
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
             CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
                     .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
+            if (topicFuture == null) {
+                return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
             }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+            return topicFuture.thenCompose(optionalTopic -> {
+                if (!optionalTopic.isPresent()) {
+                    return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                }
+                return CompletableFuture.completedFuture(((PersistentTopic) optionalTopic.get())
+                        .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
+            });
+        });
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
+    protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {

Review Comment:
   It seems L147-157 is duplicated in these methods. Better to use a common method like `getExistingPersistentTopicAsync()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843897088


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
 
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits),
+                        subName));
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        TransactionInBufferStats transactionInBufferStats = ((PersistentTopic) topicObject)
-                                .getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits));
-                        asyncResponse.resume(transactionInBufferStats);
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
     }
 
-    protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse, boolean authoritative) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionBufferStats());
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionBufferStats());
     }
 
-    protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionPendingAckStats(subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
+            boolean authoritative, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionPendingAckStats(subName));
     }
 
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int mostSigBits, long leastSigBits) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
-                        authoritative);
-                CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
-                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
-                        .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
-                getTransactionMetadata(txnMeta, transactionMetadataFuture);
-                asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with transactionCoordinatorEnabled=true."));
-            }
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+                    authoritative);
+            CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
+            TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+                    .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();

Review Comment:
   In this patch, keep the original behavior for this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843901280


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            pulsar().getTransactionMetadataStoreService().getStores()
-                                    .get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        pulsar().getTransactionMetadataStoreService().getStores()
+                                .get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, future.get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
+                            return;
+                        }
+                    }
+                    asyncResponse.resume(transactionMetadata);
+                });
+            } else {
+                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                        false, false).thenAccept(partitionMetadata -> {
+                    if (partitionMetadata.partitions == 0) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
+                            Lists.newArrayList();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        try {
+                            completableFutures
+                                    .add(pulsar().getAdminClient().transactions()
+                                            .getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+                                                    TimeUnit.MILLISECONDS));
+                        } catch (PulsarServerException e) {
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
+                    FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
                         if (e != null) {
-                            asyncResponse.resume(new RestException(e.getCause()));
+                            asyncResponse.resume(new RestException(e));
                             return;
                         }
 
-                        Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
-                        for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
+                                : completableFutures) {
                             try {
-                                transactionMetadata.put(future.get().txnId, future.get());
+                                transactionMetadataMaps.putAll(transactionMetadataMap.get());

Review Comment:
   In this patch, for this method, only remove `pulsar().getConfig().isTransactionCoordinatorEnabled()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843893880


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   This method is not in refactor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r841760254


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,144 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
 
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
-
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
             CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
                     .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
+            if (topicFuture == null) {
+                return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
             }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+            return topicFuture.thenCompose(optionalTopic -> {
+                if (!optionalTopic.isPresent()) {
+                    return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                }
+                return CompletableFuture.completedFuture(((PersistentTopic) optionalTopic.get())

Review Comment:
   Ah, because `validateTopicName` has been defined this topic as persistent.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,144 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
 
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
-
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
             CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
                     .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
+            if (topicFuture == null) {
+                return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
             }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+            return topicFuture.thenCompose(optionalTopic -> {
+                if (!optionalTopic.isPresent()) {
+                    return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                }
+                return CompletableFuture.completedFuture(((PersistentTopic) optionalTopic.get())
+                        .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
+            });
+        });
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
+    protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {

Review Comment:
   Yes, right.
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843877969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());

Review Comment:
   use ``join`` to avoid catch checked exceptions?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            pulsar().getTransactionMetadataStoreService().getStores()
-                                    .get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        pulsar().getTransactionMetadataStoreService().getStores()
+                                .get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, future.get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
+                            return;
+                        }
+                    }
+                    asyncResponse.resume(transactionMetadata);
+                });
+            } else {
+                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                        false, false).thenAccept(partitionMetadata -> {
+                    if (partitionMetadata.partitions == 0) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
+                            Lists.newArrayList();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        try {
+                            completableFutures
+                                    .add(pulsar().getAdminClient().transactions()
+                                            .getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+                                                    TimeUnit.MILLISECONDS));
+                        } catch (PulsarServerException e) {
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
+                    FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
                         if (e != null) {
-                            asyncResponse.resume(new RestException(e.getCause()));
+                            asyncResponse.resume(new RestException(e));
                             return;
                         }
 
-                        Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
-                        for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
+                                : completableFutures) {
                             try {
-                                transactionMetadata.put(future.get().txnId, future.get());
+                                transactionMetadataMaps.putAll(transactionMetadataMap.get());

Review Comment:
   use join to avoid catch checked exceptions?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
 
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits),
+                        subName));
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        TransactionInBufferStats transactionInBufferStats = ((PersistentTopic) topicObject)
-                                .getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits));
-                        asyncResponse.resume(transactionInBufferStats);
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
     }
 
-    protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse, boolean authoritative) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionBufferStats());
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionBufferStats());
     }
 
-    protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionPendingAckStats(subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
+            boolean authoritative, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionPendingAckStats(subName));
     }
 
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int mostSigBits, long leastSigBits) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
-                        authoritative);
-                CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
-                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
-                        .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
-                getTransactionMetadata(txnMeta, transactionMetadataFuture);
-                asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with transactionCoordinatorEnabled=true."));
-            }
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+                    authoritative);
+            CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
+            TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+                    .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();

Review Comment:
   looks like this method uses an endless wait for "get()" here?



##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -429,6 +507,18 @@ public void testGetPendingAckInternalStats() throws Exception {
         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
     }
 
+    @Test(timeOut = 20000)
+    public void testTransactionNotEnabled() throws Exception {
+        stopBroker();
+        conf.setTransactionCoordinatorEnabled(false);
+        super.internalSetup();
+        try {
+            admin.transactions().getCoordinatorInternalStats(1, false);

Review Comment:
   Assert.fail() here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            pulsar().getTransactionMetadataStoreService().getStores()
-                                    .get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        pulsar().getTransactionMetadataStoreService().getStores()
+                                .get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, future.get());

Review Comment:
   use join to avoid catch checked exceptions?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            pulsar().getTransactionMetadataStoreService().getStores()
-                                    .get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
     protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
                                                        boolean metadata, int coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
-                validateTopicOwnership(topicName, authoritative);
-                TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
-                        .getStores().get(TransactionCoordinatorID.get(coordinatorId));
-                if (metadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
-                    return;
-                }
-                if (metadataStore instanceof MLTransactionMetadataStore) {
-                    ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
-                    TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
-                            new TransactionCoordinatorInternalStats();
-                    TransactionLogStats transactionLogStats = new TransactionLogStats();
-                    transactionLogStats.managedLedgerName = managedLedger.getName();
-                    transactionLogStats.managedLedgerInternalStats =
-                            managedLedger.getManagedLedgerInternalStats(metadata).get();
-                    transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
-                    asyncResponse.resume(transactionCoordinatorInternalStats);
-                } else {
-                    asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                            "Broker don't use MLTransactionMetadataStore!"));
-                }
+            TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            validateTopicOwnership(topicName, authoritative);
+            TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
+                    .getStores().get(TransactionCoordinatorID.get(coordinatorId));
+            if (metadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            if (metadataStore instanceof MLTransactionMetadataStore) {
+                ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
+                TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
+                        new TransactionCoordinatorInternalStats();
+                TransactionLogStats transactionLogStats = new TransactionLogStats();
+                transactionLogStats.managedLedgerName = managedLedger.getName();
+                transactionLogStats.managedLedgerInternalStats =
+                        managedLedger.getManagedLedgerInternalStats(metadata).get();

Review Comment:
   looks like this method uses an endless wait for "get()" here?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -472,74 +349,75 @@ protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
     protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
                                                        boolean metadata, int coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
-                validateTopicOwnership(topicName, authoritative);
-                TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
-                        .getStores().get(TransactionCoordinatorID.get(coordinatorId));
-                if (metadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
-                    return;
-                }
-                if (metadataStore instanceof MLTransactionMetadataStore) {
-                    ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
-                    TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
-                            new TransactionCoordinatorInternalStats();
-                    TransactionLogStats transactionLogStats = new TransactionLogStats();
-                    transactionLogStats.managedLedgerName = managedLedger.getName();
-                    transactionLogStats.managedLedgerInternalStats =
-                            managedLedger.getManagedLedgerInternalStats(metadata).get();
-                    transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
-                    asyncResponse.resume(transactionCoordinatorInternalStats);
-                } else {
-                    asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                            "Broker don't use MLTransactionMetadataStore!"));
-                }
+            TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            validateTopicOwnership(topicName, authoritative);

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -168,6 +209,7 @@ public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse,
                                        @DefaultValue("false") boolean authoritative,
                                        @PathParam("mostSigBits") String mostSigBits,
                                        @PathParam("leastSigBits") String leastSigBits) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -67,216 +67,97 @@
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
 
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits),
+                        subName));
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        TransactionInBufferStats transactionInBufferStats = ((PersistentTopic) topicObject)
-                                .getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits));
-                        asyncResponse.resume(transactionInBufferStats);
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
     }
 
-    protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse, boolean authoritative) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionBufferStats());
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionBufferStats());
     }
 
-    protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionPendingAckStats(subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
+            boolean authoritative, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionPendingAckStats(subName));
     }
 
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int mostSigBits, long leastSigBits) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
-                        authoritative);
-                CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
-                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
-                        .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
-                getTransactionMetadata(txnMeta, transactionMetadataFuture);
-                asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with transactionCoordinatorEnabled=true."));
-            }
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),

Review Comment:
   Looks like we should check this method exception or make it async.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -61,6 +61,7 @@ public void getCoordinatorStats(@Suspended final AsyncResponse asyncResponse,
                                     @QueryParam("authoritative")
                                     @DefaultValue("false") boolean authoritative,
                                     @QueryParam("coordinatorId") Integer coordinatorId) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -205,6 +248,7 @@ public void getCoordinatorInternalStats(@Suspended final AsyncResponse asyncResp
                                             @DefaultValue("false") boolean authoritative,
                                             @PathParam("coordinatorId") String coordinatorId,
                                             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -188,6 +230,7 @@ public void getSlowTransactions(@Suspended final AsyncResponse asyncResponse,
                                     @DefaultValue("false") boolean authoritative,
                                     @PathParam("timeout") String timeout,
                                     @QueryParam("coordinatorId") Integer coordinatorId) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   Looks like we should catch this method exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843899801


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -168,6 +209,7 @@ public void getTransactionMetadata(@Suspended final AsyncResponse asyncResponse,
                                        @DefaultValue("false") boolean authoritative,
                                        @PathParam("mostSigBits") String mostSigBits,
                                        @PathParam("leastSigBits") String leastSigBits) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   It's ok, like `validateTopicName`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843883448


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java:
##########
@@ -378,91 +259,87 @@ private void getTransactionMetadata(TxnMeta txnMeta,
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            pulsar().getTransactionMetadataStoreService().getStores()
-                                    .get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        pulsar().getTransactionMetadataStoreService().getStores()
+                                .get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, future.get());

Review Comment:
   use join to avoid catch checked exceptions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843902159


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java:
##########
@@ -429,6 +507,18 @@ public void testGetPendingAckInternalStats() throws Exception {
         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
     }
 
+    @Test(timeOut = 20000)
+    public void testTransactionNotEnabled() throws Exception {
+        stopBroker();
+        conf.setTransactionCoordinatorEnabled(false);
+        super.internalSetup();
+        try {
+            admin.transactions().getCoordinatorInternalStats(1, false);

Review Comment:
   yes. fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15017: [fix][transaction] Fix transaction REST API redirect issue.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #15017:
URL: https://github.com/apache/pulsar/pull/15017#discussion_r843898954


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java:
##########
@@ -61,6 +61,7 @@ public void getCoordinatorStats(@Suspended final AsyncResponse asyncResponse,
                                     @QueryParam("authoritative")
                                     @DefaultValue("false") boolean authoritative,
                                     @QueryParam("coordinatorId") Integer coordinatorId) {
+        checkTransactionCoordinatorEnabled();

Review Comment:
   It's ok, like `validateTopicName`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org