You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/07 02:20:34 UTC

[pulsar] branch master updated: [fix][transaction] Fix transaction REST API redirect issue. (#15017)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b8bf154aec [fix][transaction] Fix transaction REST API redirect issue. (#15017)
8b8bf154aec is described below

commit 8b8bf154aecf07c45fede2679445ef0563b49c5b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 7 10:20:28 2022 +0800

    [fix][transaction] Fix transaction REST API redirect issue. (#15017)
---
 .../pulsar/broker/admin/impl/TransactionsBase.java | 510 ++++++++-------------
 .../pulsar/broker/admin/v3/Transactions.java       |  67 ++-
 .../broker/admin/v3/AdminApiTransactionTest.java   |  92 +++-
 3 files changed, 341 insertions(+), 328 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 308f18e8bc3..b225cd2e266 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
 import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
 import static javax.ws.rs.core.Response.Status.NOT_FOUND;
 import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
-import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -46,6 +44,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -53,6 +52,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionLogStats;
 import org.apache.pulsar.common.policies.data.TransactionMetadata;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
+import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -67,216 +67,97 @@ public abstract class TransactionsBase extends AdminResource {
 
     protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
                                                Integer coordinatorId) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            if (coordinatorId != null) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                        authoritative);
-                TransactionMetadataStore transactionMetadataStore =
-                        pulsar().getTransactionMetadataStoreService().getStores()
-                                .get(TransactionCoordinatorID.get(coordinatorId));
-                if (transactionMetadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+        if (coordinatorId != null) {
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                    authoritative);
+            TransactionMetadataStore transactionMetadataStore =
+                    pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId));
+            if (transactionMetadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+        } else {
+            getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                    false, false).thenAccept(partitionMetadata -> {
+                if (partitionMetadata.partitions == 0) {
+                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                            "Transaction coordinator not found"));
                     return;
                 }
-                asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
-            } else {
-                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                        false, false).thenAccept(partitionMetadata -> {
-                    if (partitionMetadata.partitions == 0) {
-                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                "Transaction coordinator not found"));
+                List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+                        Lists.newArrayList();
+                for (int i = 0; i < partitionMetadata.partitions; i++) {
+                    try {
+                        transactionMetadataStoreInfoFutures
+                                .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+                    } catch (PulsarServerException e) {
+                        asyncResponse.resume(new RestException(e));
                         return;
                     }
-                    List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
-                            Lists.newArrayList();
-                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                }
+                Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+                FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e));
+                        return;
+                    }
+
+                    for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
                         try {
-                            transactionMetadataStoreInfoFutures
-                                    .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
-                        } catch (PulsarServerException e) {
-                            asyncResponse.resume(new RestException(e));
+                            stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
                             return;
                         }
                     }
-                    Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
-                    FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
-
-                        for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
-                            try {
-                                stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
-                            } catch (Exception exception) {
-                                asyncResponse.resume(new RestException(exception.getCause()));
-                                return;
-                            }
-                        }
 
-                        asyncResponse.resume(stats);
-                    });
-                }).exceptionally(ex -> {
-                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                    resumeAsyncResponseExceptionally(asyncResponse, ex);
-                    return null;
+                    asyncResponse.resume(stats);
                 });
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+            }).exceptionally(ex -> {
+                log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                resumeAsyncResponseExceptionally(asyncResponse, ex);
+                return null;
+            });
         }
     }
 
-    protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                           long mostSigBits, long leastSigBits, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject)
-                                .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+            boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits),
+                        subName));
     }
 
-    protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                       long mostSigBits, long leastSigBits) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        TransactionInBufferStats transactionInBufferStats = ((PersistentTopic) topicObject)
-                                .getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits));
-                        asyncResponse.resume(transactionInBufferStats);
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
-        }
+    protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+            boolean authoritative, long mostSigBits, long leastSigBits) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
     }
 
-    protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse, boolean authoritative) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionBufferStats());
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionBufferStats());
     }
 
-    protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative, String subName) {
-        if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            validateTopicOwnership(topicName, authoritative);
-            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                    .getTopics().get(topicName.toString());
-            if (topicFuture != null) {
-                topicFuture.whenComplete((optionalTopic, e) -> {
-                    if (e != null) {
-                        asyncResponse.resume(new RestException(e));
-                        return;
-                    }
-
-                    if (!optionalTopic.isPresent()) {
-                        asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                "Topic is not owned by this broker!"));
-                        return;
-                    }
-                    Topic topicObject = optionalTopic.get();
-                    if (topicObject instanceof PersistentTopic) {
-                        asyncResponse.resume(((PersistentTopic) topicObject).getTransactionPendingAckStats(subName));
-                    } else {
-                        asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
-                    }
-                });
-            } else {
-                asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-            }
-        } else {
-            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
-        }
+    protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
+            boolean authoritative, String subName) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenApply(topic -> topic.getTransactionPendingAckStats(subName));
     }
 
     protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
                                                   boolean authoritative, int mostSigBits, long leastSigBits) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
-                        authoritative);
-                CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
-                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
-                        .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
-                getTransactionMetadata(txnMeta, transactionMetadataFuture);
-                asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with transactionCoordinatorEnabled=true."));
-            }
+            validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+                    authoritative);
+            CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
+            TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+                    .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
+            getTransactionMetadata(txnMeta, transactionMetadataFuture);
+            asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
         } catch (Exception e) {
             if (e instanceof ExecutionException) {
                 if (e.getCause() instanceof CoordinatorNotFoundException
@@ -378,91 +259,87 @@ public abstract class TransactionsBase extends AdminResource {
     protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
                                                boolean authoritative, long timeout, Integer coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                if (coordinatorId != null) {
-                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
-                            authoritative);
-                    TransactionMetadataStore transactionMetadataStore =
-                            pulsar().getTransactionMetadataStoreService().getStores()
-                                    .get(TransactionCoordinatorID.get(coordinatorId));
-                    if (transactionMetadataStore == null) {
-                        asyncResponse.resume(new RestException(NOT_FOUND,
-                                "Transaction coordinator not found! coordinator id : " + coordinatorId));
+            if (coordinatorId != null) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                TransactionMetadataStore transactionMetadataStore =
+                        pulsar().getTransactionMetadataStoreService().getStores()
+                                .get(TransactionCoordinatorID.get(coordinatorId));
+                if (transactionMetadataStore == null) {
+                    asyncResponse.resume(new RestException(NOT_FOUND,
+                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                    return;
+                }
+                List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
+                List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+                for (TxnMeta txnMeta : transactions) {
+                    CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
+                    getTransactionMetadata(txnMeta, completableFuture);
+                    completableFutures.add(completableFuture);
+                }
+
+                FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    if (e != null) {
+                        asyncResponse.resume(new RestException(e.getCause()));
                         return;
                     }
-                    List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
-                    List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
-                    for (TxnMeta txnMeta : transactions) {
-                        CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
-                        getTransactionMetadata(txnMeta, completableFuture);
-                        completableFutures.add(completableFuture);
-                    }
 
-                    FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+                    Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
+                    for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        try {
+                            transactionMetadata.put(future.get().txnId, future.get());
+                        } catch (Exception exception) {
+                            asyncResponse.resume(new RestException(exception.getCause()));
+                            return;
+                        }
+                    }
+                    asyncResponse.resume(transactionMetadata);
+                });
+            } else {
+                getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+                        false, false).thenAccept(partitionMetadata -> {
+                    if (partitionMetadata.partitions == 0) {
+                        asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+                                "Transaction coordinator not found"));
+                        return;
+                    }
+                    List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
+                            Lists.newArrayList();
+                    for (int i = 0; i < partitionMetadata.partitions; i++) {
+                        try {
+                            completableFutures
+                                    .add(pulsar().getAdminClient().transactions()
+                                            .getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+                                                    TimeUnit.MILLISECONDS));
+                        } catch (PulsarServerException e) {
+                            asyncResponse.resume(new RestException(e));
+                            return;
+                        }
+                    }
+                    Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
+                    FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
                         if (e != null) {
-                            asyncResponse.resume(new RestException(e.getCause()));
+                            asyncResponse.resume(new RestException(e));
                             return;
                         }
 
-                        Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
-                        for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+                        for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
+                                : completableFutures) {
                             try {
-                                transactionMetadata.put(future.get().txnId, future.get());
+                                transactionMetadataMaps.putAll(transactionMetadataMap.get());
                             } catch (Exception exception) {
                                 asyncResponse.resume(new RestException(exception.getCause()));
                                 return;
                             }
                         }
-                        asyncResponse.resume(transactionMetadata);
-                    });
-                } else {
-                    getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
-                            false, false).thenAccept(partitionMetadata -> {
-                        if (partitionMetadata.partitions == 0) {
-                            asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
-                                    "Transaction coordinator not found"));
-                            return;
-                        }
-                        List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
-                                Lists.newArrayList();
-                        for (int i = 0; i < partitionMetadata.partitions; i++) {
-                            try {
-                                completableFutures
-                                        .add(pulsar().getAdminClient().transactions()
-                                                .getSlowTransactionsByCoordinatorIdAsync(i, timeout,
-                                                        TimeUnit.MILLISECONDS));
-                            } catch (PulsarServerException e) {
-                                asyncResponse.resume(new RestException(e));
-                                return;
-                            }
-                        }
-                        Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
-                        FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
-                            if (e != null) {
-                                asyncResponse.resume(new RestException(e));
-                                return;
-                            }
-
-                            for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
-                                    : completableFutures) {
-                                try {
-                                    transactionMetadataMaps.putAll(transactionMetadataMap.get());
-                                } catch (Exception exception) {
-                                    asyncResponse.resume(new RestException(exception.getCause()));
-                                    return;
-                                }
-                            }
-                            asyncResponse.resume(transactionMetadataMaps);
-                        });
-                    }).exceptionally(ex -> {
-                        log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
-                        resumeAsyncResponseExceptionally(asyncResponse, ex);
-                        return null;
+                        asyncResponse.resume(transactionMetadataMaps);
                     });
+                }).exceptionally(ex -> {
+                    log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
 
-                }
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
             }
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
@@ -472,33 +349,28 @@ public abstract class TransactionsBase extends AdminResource {
     protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
                                                        boolean metadata, int coordinatorId) {
         try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
-                validateTopicOwnership(topicName, authoritative);
-                TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
-                        .getStores().get(TransactionCoordinatorID.get(coordinatorId));
-                if (metadataStore == null) {
-                    asyncResponse.resume(new RestException(NOT_FOUND,
-                            "Transaction coordinator not found! coordinator id : " + coordinatorId));
-                    return;
-                }
-                if (metadataStore instanceof MLTransactionMetadataStore) {
-                    ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
-                    TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
-                            new TransactionCoordinatorInternalStats();
-                    TransactionLogStats transactionLogStats = new TransactionLogStats();
-                    transactionLogStats.managedLedgerName = managedLedger.getName();
-                    transactionLogStats.managedLedgerInternalStats =
-                            managedLedger.getManagedLedgerInternalStats(metadata).get();
-                    transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
-                    asyncResponse.resume(transactionCoordinatorInternalStats);
-                } else {
-                    asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                            "Broker don't use MLTransactionMetadataStore!"));
-                }
+            TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+            validateTopicOwnership(topicName, authoritative);
+            TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
+                    .getStores().get(TransactionCoordinatorID.get(coordinatorId));
+            if (metadataStore == null) {
+                asyncResponse.resume(new RestException(NOT_FOUND,
+                        "Transaction coordinator not found! coordinator id : " + coordinatorId));
+                return;
+            }
+            if (metadataStore instanceof MLTransactionMetadataStore) {
+                ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
+                TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
+                        new TransactionCoordinatorInternalStats();
+                TransactionLogStats transactionLogStats = new TransactionLogStats();
+                transactionLogStats.managedLedgerName = managedLedger.getName();
+                transactionLogStats.managedLedgerInternalStats =
+                        managedLedger.getManagedLedgerInternalStats(metadata).get();
+                transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
+                asyncResponse.resume(transactionCoordinatorInternalStats);
             } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with transactionCoordinatorEnabled=true."));
+                asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
+                        "Broker don't use MLTransactionMetadataStore!"));
             }
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e.getCause()));
@@ -506,40 +378,46 @@ public abstract class TransactionsBase extends AdminResource {
     }
 
     protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats(
-            boolean authoritative, TopicName topicName, String subName, boolean metadata) {
+            boolean authoritative, String subName, boolean metadata) {
+        return getExistingPersistentTopicAsync(authoritative)
+                .thenCompose(topic -> topic.getPendingAckManagedLedger(subName))
+                .thenCompose(managedLedger ->
+                        managedLedger.getManagedLedgerInternalStats(metadata)
+                            .thenApply(internalStats -> {
+                                TransactionLogStats pendingAckLogStats = new TransactionLogStats();
+                                pendingAckLogStats.managedLedgerName = managedLedger.getName();
+                                pendingAckLogStats.managedLedgerInternalStats = internalStats;
+                                return pendingAckLogStats;
+                            })
+                            .thenApply(pendingAckLogStats -> {
+                                TransactionPendingAckInternalStats stats = new TransactionPendingAckInternalStats();
+                                stats.pendingAckLogStats = pendingAckLogStats;
+                                return stats;
+                            })
+                );
+    }
+
+    protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boolean authoritative) {
+        return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
+            CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
+                    .getTopics().get(topicName.toString());
+            if (topicFuture == null) {
+                return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+            }
+            return topicFuture.thenCompose(optionalTopic -> {
+                if (!optionalTopic.isPresent()) {
+                    return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                }
+                return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get());
+            });
+        });
+    }
+
+    protected void checkTransactionCoordinatorEnabled() {
         if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-            return FutureUtil.failedFuture(new RestException(SERVICE_UNAVAILABLE,
-                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+           throw new RestException(SERVICE_UNAVAILABLE,
+                    "This Broker is not configured with transactionCoordinatorEnabled=true.");
         }
-        return validateTopicOwnershipAsync(topicName, authoritative)
-                .thenCompose(__ -> {
-                    CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                            .getTopics().get(topicName.toString());
-                    if (topicFuture == null) {
-                        return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
-                    }
-                    return topicFuture.thenCompose(optionalTopic -> {
-                        if (!optionalTopic.isPresent()) {
-                            return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
-                        } else {
-                            Topic topicObject = optionalTopic.get();
-                            return ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName)
-                                    .thenCompose(managedLedger -> managedLedger.getManagedLedgerInternalStats(metadata)
-                                            .thenApply(internalStats -> {
-                                                TransactionLogStats pendingAckLogStats = new TransactionLogStats();
-                                                pendingAckLogStats.managedLedgerName = managedLedger.getName();
-                                                pendingAckLogStats.managedLedgerInternalStats = internalStats;
-                                                return pendingAckLogStats;
-                                            })
-                                            .thenApply(pendingAckLogStats -> {
-                                                TransactionPendingAckInternalStats stats =
-                                                        new TransactionPendingAckInternalStats();
-                                                stats.pendingAckLogStats = pendingAckLogStats;
-                                                return stats;
-                                            }));
-                        }
-                    });
-                });
     }
 
     protected void validateTopicName(String property, String namespace, String encodedTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index bbd79036ecf..9cb825b9f8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -61,6 +61,7 @@ public class Transactions extends TransactionsBase {
                                     @QueryParam("authoritative")
                                     @DefaultValue("false") boolean authoritative,
                                     @QueryParam("coordinatorId") Integer coordinatorId) {
+        checkTransactionCoordinatorEnabled();
         internalGetCoordinatorStats(asyncResponse, authoritative, coordinatorId);
     }
 
@@ -82,9 +83,19 @@ public class Transactions extends TransactionsBase {
                                             @PathParam("topic") @Encoded String encodedTopic,
                                             @PathParam("mostSigBits") String mostSigBits,
                                             @PathParam("leastSigBits") String leastSigBits) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGetTransactionInBufferStats(asyncResponse, authoritative,
-                Long.parseLong(mostSigBits), Long.parseLong(leastSigBits));
+        try {
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits),
+                    Long.parseLong(leastSigBits))
+                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
     }
 
     @GET
@@ -106,9 +117,19 @@ public class Transactions extends TransactionsBase {
                                                 @PathParam("mostSigBits") String mostSigBits,
                                                 @PathParam("leastSigBits") String leastSigBits,
                                                 @PathParam("subName") String subName) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGetTransactionInPendingAckStats(asyncResponse, authoritative, Long.parseLong(mostSigBits),
-                Long.parseLong(leastSigBits), subName);
+        try {
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits),
+                    Long.parseLong(leastSigBits), subName)
+                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
     }
 
     @GET
@@ -127,8 +148,18 @@ public class Transactions extends TransactionsBase {
                                           @PathParam("tenant") String tenant,
                                           @PathParam("namespace") String namespace,
                                           @PathParam("topic") @Encoded String encodedTopic) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGetTransactionBufferStats(asyncResponse, authoritative);
+        try {
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetTransactionBufferStats(authoritative)
+                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
     }
 
     @GET
@@ -148,8 +179,18 @@ public class Transactions extends TransactionsBase {
                                    @PathParam("namespace") String namespace,
                                    @PathParam("topic") @Encoded String encodedTopic,
                                    @PathParam("subName") String subName) {
-        validateTopicName(tenant, namespace, encodedTopic);
-        internalGetPendingAckStats(asyncResponse, authoritative, subName);
+        try {
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetPendingAckStats(authoritative, subName)
+                    .thenAccept(stats -> asyncResponse.resume(stats))
+                    .exceptionally(ex -> {
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
     }
 
     @GET
@@ -168,6 +209,7 @@ public class Transactions extends TransactionsBase {
                                        @DefaultValue("false") boolean authoritative,
                                        @PathParam("mostSigBits") String mostSigBits,
                                        @PathParam("leastSigBits") String leastSigBits) {
+        checkTransactionCoordinatorEnabled();
         internalGetTransactionMetadata(asyncResponse, authoritative, Integer.parseInt(mostSigBits),
                 Long.parseLong(leastSigBits));
     }
@@ -188,6 +230,7 @@ public class Transactions extends TransactionsBase {
                                     @DefaultValue("false") boolean authoritative,
                                     @PathParam("timeout") String timeout,
                                     @QueryParam("coordinatorId") Integer coordinatorId) {
+        checkTransactionCoordinatorEnabled();
         internalGetSlowTransactions(asyncResponse, authoritative, Long.parseLong(timeout), coordinatorId);
     }
 
@@ -205,6 +248,7 @@ public class Transactions extends TransactionsBase {
                                             @DefaultValue("false") boolean authoritative,
                                             @PathParam("coordinatorId") String coordinatorId,
                                             @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
+        checkTransactionCoordinatorEnabled();
         internalGetCoordinatorInternalStats(asyncResponse, authoritative, metadata, Integer.parseInt(coordinatorId));
     }
 
@@ -229,8 +273,9 @@ public class Transactions extends TransactionsBase {
                                            @PathParam("subName") String subName,
                                            @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
         try {
+            checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
-            internalGetPendingAckInternalStats(authoritative, topicName, subName, metadata)
+            internalGetPendingAckInternalStats(authoritative, subName, metadata)
                     .thenAccept(stats -> asyncResponse.resume(stats))
                     .exceptionally(ex -> {
                         Throwable cause = FutureUtil.unwrapCompletionException(ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 920ad61be4e..fa6e56724fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.v3;
 
 import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.http.HttpStatus;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -121,6 +122,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         initTransaction(2);
         TransactionImpl transaction = (TransactionImpl) getTransaction();
         final String topic = "persistent://public/default/testGetTransactionInBufferStats";
+        try {
+            admin.transactions()
+                    .getTransactionInBufferStatsAsync(new TxnID(1, 1), topic).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
+        try {
+            pulsar.getBrokerService().getTopic(topic, false);
+            admin.transactions()
+                    .getTransactionInBufferStatsAsync(new TxnID(1, 1), topic).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
         MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
@@ -146,6 +166,27 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         initTransaction(2);
         final String topic = "persistent://public/default/testGetTransactionInBufferStats";
         final String subName = "test";
+        try {
+            admin.transactions()
+                    .getTransactionInPendingAckStatsAsync(new TxnID(1,
+                            2), topic, subName).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
+        try {
+            pulsar.getBrokerService().getTopic(topic, false);
+            admin.transactions()
+                    .getTransactionInPendingAckStatsAsync(new TxnID(1,
+                            2), topic, subName).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
@@ -252,8 +293,26 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         final String topic = "persistent://public/default/testGetTransactionBufferStats";
         final String subName1 = "test1";
         final String subName2 = "test2";
+        try {
+            admin.transactions()
+                    .getTransactionBufferStatsAsync(topic).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
+        try {
+            pulsar.getBrokerService().getTopic(topic, false);
+            admin.transactions()
+                    .getTransactionBufferStatsAsync(topic).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
         admin.topics().createNonPartitionedTopic(topic);
-
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
                 .sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
         Consumer<byte[]> consumer1 = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
@@ -289,6 +348,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         initTransaction(2);
         final String topic = "persistent://public/default/testGetPendingAckStats";
         final String subName = "test1";
+        try {
+            admin.transactions()
+                    .getPendingAckStatsAsync(topic, subName).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
+        try {
+            pulsar.getBrokerService().getTopic(topic, false);
+            admin.transactions()
+                    .getPendingAckStatsAsync(topic, subName).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
         admin.topics().createNonPartitionedTopic(topic);
 
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
@@ -429,6 +507,18 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
     }
 
+    @Test(timeOut = 20000)
+    public void testTransactionNotEnabled() throws Exception {
+        stopBroker();
+        conf.setTransactionCoordinatorEnabled(false);
+        super.internalSetup();
+        try {
+            admin.transactions().getCoordinatorInternalStats(1, false);
+        } catch (PulsarAdminException ex) {
+            assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
+        }
+    }
+
     private static void verifyCoordinatorStats(String state,
                                                long sequenceId, long lowWaterMark) {
         assertEquals(state, "Ready");