You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/04/07 02:20:34 UTC
[pulsar] branch master updated: [fix][transaction] Fix transaction REST API redirect issue. (#15017)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8b8bf154aec [fix][transaction] Fix transaction REST API redirect issue. (#15017)
8b8bf154aec is described below
commit 8b8bf154aecf07c45fede2679445ef0563b49c5b
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Apr 7 10:20:28 2022 +0800
[fix][transaction] Fix transaction REST API redirect issue. (#15017)
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 510 ++++++++-------------
.../pulsar/broker/admin/v3/Transactions.java | 67 ++-
.../broker/admin/v3/AdminApiTransactionTest.java | 92 +++-
3 files changed, 341 insertions(+), 328 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 308f18e8bc3..b225cd2e266 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -18,11 +18,9 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
-import static javax.ws.rs.core.Response.Status.TEMPORARY_REDIRECT;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashMap;
@@ -46,6 +44,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
@@ -53,6 +52,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
+import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -67,216 +67,97 @@ public abstract class TransactionsBase extends AdminResource {
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
Integer coordinatorId) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
- validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
- pulsar().getTransactionMetadataStoreService().getStores()
- .get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+ validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+ pulsar().getTransactionMetadataStoreService().getStores()
+ .get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : " + coordinatorId));
+ return;
+ }
+ asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
+ } else {
+ getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
return;
}
- asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
- } else {
- getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
+ List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ transactionMetadataStoreInfoFutures
+ .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
return;
}
- List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
+ }
+ Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
+ FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+
+ for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
try {
- transactionMetadataStoreInfoFutures
- .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
+ stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
- Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
- FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
- try {
- stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
- } catch (Exception exception) {
- asyncResponse.resume(new RestException(exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(stats);
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ asyncResponse.resume(stats);
});
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
}
}
- protected void internalGetTransactionInPendingAckStats(AsyncResponse asyncResponse, boolean authoritative,
- long mostSigBits, long leastSigBits, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic) topicObject)
- .getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
+ boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits),
+ subName));
}
- protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse, boolean authoritative,
- long mostSigBits, long leastSigBits) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- TransactionInBufferStats transactionInBufferStats = ((PersistentTopic) topicObject)
- .getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits));
- asyncResponse.resume(transactionInBufferStats);
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
- }
+ protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
+ boolean authoritative, long mostSigBits, long leastSigBits) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
}
- protected void internalGetTransactionBufferStats(AsyncResponse asyncResponse, boolean authoritative) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic) topicObject).getTransactionBufferStats());
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionBufferStats());
}
- protected void internalGetPendingAckStats(AsyncResponse asyncResponse, boolean authoritative, String subName) {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- asyncResponse.resume(((PersistentTopic) topicObject).getTransactionPendingAckStats(subName));
- } else {
- asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
- }
- });
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
- }
+ protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
+ boolean authoritative, String subName) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenApply(topic -> topic.getTransactionPendingAckStats(subName));
}
protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int mostSigBits, long leastSigBits) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
- authoritative);
- CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
- TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
- .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
- getTransactionMetadata(txnMeta, transactionMetadataFuture);
- asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
- }
+ validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
+ authoritative);
+ CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
+ TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+ .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
+ getTransactionMetadata(txnMeta, transactionMetadataFuture);
+ asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
} catch (Exception e) {
if (e instanceof ExecutionException) {
if (e.getCause() instanceof CoordinatorNotFoundException
@@ -378,91 +259,87 @@ public abstract class TransactionsBase extends AdminResource {
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long timeout, Integer coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- if (coordinatorId != null) {
- validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
- authoritative);
- TransactionMetadataStore transactionMetadataStore =
- pulsar().getTransactionMetadataStoreService().getStores()
- .get(TransactionCoordinatorID.get(coordinatorId));
- if (transactionMetadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id : " + coordinatorId));
+ if (coordinatorId != null) {
+ validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+ authoritative);
+ TransactionMetadataStore transactionMetadataStore =
+ pulsar().getTransactionMetadataStoreService().getStores()
+ .get(TransactionCoordinatorID.get(coordinatorId));
+ if (transactionMetadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : " + coordinatorId));
+ return;
+ }
+ List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
+ List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+ for (TxnMeta txnMeta : transactions) {
+ CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
+ getTransactionMetadata(txnMeta, completableFuture);
+ completableFutures.add(completableFuture);
+ }
+
+ FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+ if (e != null) {
+ asyncResponse.resume(new RestException(e.getCause()));
return;
}
- List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
- List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
- for (TxnMeta txnMeta : transactions) {
- CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
- getTransactionMetadata(txnMeta, completableFuture);
- completableFutures.add(completableFuture);
- }
- FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
+ Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
+ for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+ try {
+ transactionMetadata.put(future.get().txnId, future.get());
+ } catch (Exception exception) {
+ asyncResponse.resume(new RestException(exception.getCause()));
+ return;
+ }
+ }
+ asyncResponse.resume(transactionMetadata);
+ });
+ } else {
+ getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
+ false, false).thenAccept(partitionMetadata -> {
+ if (partitionMetadata.partitions == 0) {
+ asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
+ "Transaction coordinator not found"));
+ return;
+ }
+ List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
+ Lists.newArrayList();
+ for (int i = 0; i < partitionMetadata.partitions; i++) {
+ try {
+ completableFutures
+ .add(pulsar().getAdminClient().transactions()
+ .getSlowTransactionsByCoordinatorIdAsync(i, timeout,
+ TimeUnit.MILLISECONDS));
+ } catch (PulsarServerException e) {
+ asyncResponse.resume(new RestException(e));
+ return;
+ }
+ }
+ Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
+ FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
if (e != null) {
- asyncResponse.resume(new RestException(e.getCause()));
+ asyncResponse.resume(new RestException(e));
return;
}
- Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
- for (CompletableFuture<TransactionMetadata> future : completableFutures) {
+ for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
+ : completableFutures) {
try {
- transactionMetadata.put(future.get().txnId, future.get());
+ transactionMetadataMaps.putAll(transactionMetadataMap.get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
- asyncResponse.resume(transactionMetadata);
- });
- } else {
- getPartitionedTopicMetadataAsync(TopicName.TRANSACTION_COORDINATOR_ASSIGN,
- false, false).thenAccept(partitionMetadata -> {
- if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
- "Transaction coordinator not found"));
- return;
- }
- List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
- Lists.newArrayList();
- for (int i = 0; i < partitionMetadata.partitions; i++) {
- try {
- completableFutures
- .add(pulsar().getAdminClient().transactions()
- .getSlowTransactionsByCoordinatorIdAsync(i, timeout,
- TimeUnit.MILLISECONDS));
- } catch (PulsarServerException e) {
- asyncResponse.resume(new RestException(e));
- return;
- }
- }
- Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
- FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
-
- for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
- : completableFutures) {
- try {
- transactionMetadataMaps.putAll(transactionMetadataMap.get());
- } catch (Exception exception) {
- asyncResponse.resume(new RestException(exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(transactionMetadataMaps);
- });
- }).exceptionally(ex -> {
- log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
- resumeAsyncResponseExceptionally(asyncResponse, ex);
- return null;
+ asyncResponse.resume(transactionMetadataMaps);
});
+ }).exceptionally(ex -> {
+ log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, "Broker don't support transaction!"));
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
@@ -472,33 +349,28 @@ public abstract class TransactionsBase extends AdminResource {
protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
boolean metadata, int coordinatorId) {
try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
- validateTopicOwnership(topicName, authoritative);
- TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
- .getStores().get(TransactionCoordinatorID.get(coordinatorId));
- if (metadataStore == null) {
- asyncResponse.resume(new RestException(NOT_FOUND,
- "Transaction coordinator not found! coordinator id : " + coordinatorId));
- return;
- }
- if (metadataStore instanceof MLTransactionMetadataStore) {
- ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
- TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
- new TransactionCoordinatorInternalStats();
- TransactionLogStats transactionLogStats = new TransactionLogStats();
- transactionLogStats.managedLedgerName = managedLedger.getName();
- transactionLogStats.managedLedgerInternalStats =
- managedLedger.getManagedLedgerInternalStats(metadata).get();
- transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
- asyncResponse.resume(transactionCoordinatorInternalStats);
- } else {
- asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
- "Broker don't use MLTransactionMetadataStore!"));
- }
+ TopicName topicName = TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
+ validateTopicOwnership(topicName, authoritative);
+ TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
+ .getStores().get(TransactionCoordinatorID.get(coordinatorId));
+ if (metadataStore == null) {
+ asyncResponse.resume(new RestException(NOT_FOUND,
+ "Transaction coordinator not found! coordinator id : " + coordinatorId));
+ return;
+ }
+ if (metadataStore instanceof MLTransactionMetadataStore) {
+ ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
+ TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
+ new TransactionCoordinatorInternalStats();
+ TransactionLogStats transactionLogStats = new TransactionLogStats();
+ transactionLogStats.managedLedgerName = managedLedger.getName();
+ transactionLogStats.managedLedgerInternalStats =
+ managedLedger.getManagedLedgerInternalStats(metadata).get();
+ transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
+ asyncResponse.resume(transactionCoordinatorInternalStats);
} else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
+ asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
+ "Broker don't use MLTransactionMetadataStore!"));
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e.getCause()));
@@ -506,40 +378,46 @@ public abstract class TransactionsBase extends AdminResource {
}
protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats(
- boolean authoritative, TopicName topicName, String subName, boolean metadata) {
+ boolean authoritative, String subName, boolean metadata) {
+ return getExistingPersistentTopicAsync(authoritative)
+ .thenCompose(topic -> topic.getPendingAckManagedLedger(subName))
+ .thenCompose(managedLedger ->
+ managedLedger.getManagedLedgerInternalStats(metadata)
+ .thenApply(internalStats -> {
+ TransactionLogStats pendingAckLogStats = new TransactionLogStats();
+ pendingAckLogStats.managedLedgerName = managedLedger.getName();
+ pendingAckLogStats.managedLedgerInternalStats = internalStats;
+ return pendingAckLogStats;
+ })
+ .thenApply(pendingAckLogStats -> {
+ TransactionPendingAckInternalStats stats = new TransactionPendingAckInternalStats();
+ stats.pendingAckLogStats = pendingAckLogStats;
+ return stats;
+ })
+ );
+ }
+
+ protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boolean authoritative) {
+ return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
+ CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
+ .getTopics().get(topicName.toString());
+ if (topicFuture == null) {
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+ }
+ return topicFuture.thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent()) {
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+ }
+ return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get());
+ });
+ });
+ }
+
+ protected void checkTransactionCoordinatorEnabled() {
if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- return FutureUtil.failedFuture(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
+ throw new RestException(SERVICE_UNAVAILABLE,
+ "This Broker is not configured with transactionCoordinatorEnabled=true.");
}
- return validateTopicOwnershipAsync(topicName, authoritative)
- .thenCompose(__ -> {
- CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture == null) {
- return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
- }
- return topicFuture.thenCompose(optionalTopic -> {
- if (!optionalTopic.isPresent()) {
- return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
- } else {
- Topic topicObject = optionalTopic.get();
- return ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName)
- .thenCompose(managedLedger -> managedLedger.getManagedLedgerInternalStats(metadata)
- .thenApply(internalStats -> {
- TransactionLogStats pendingAckLogStats = new TransactionLogStats();
- pendingAckLogStats.managedLedgerName = managedLedger.getName();
- pendingAckLogStats.managedLedgerInternalStats = internalStats;
- return pendingAckLogStats;
- })
- .thenApply(pendingAckLogStats -> {
- TransactionPendingAckInternalStats stats =
- new TransactionPendingAckInternalStats();
- stats.pendingAckLogStats = pendingAckLogStats;
- return stats;
- }));
- }
- });
- });
}
protected void validateTopicName(String property, String namespace, String encodedTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index bbd79036ecf..9cb825b9f8e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -61,6 +61,7 @@ public class Transactions extends TransactionsBase {
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@QueryParam("coordinatorId") Integer coordinatorId) {
+ checkTransactionCoordinatorEnabled();
internalGetCoordinatorStats(asyncResponse, authoritative, coordinatorId);
}
@@ -82,9 +83,19 @@ public class Transactions extends TransactionsBase {
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("mostSigBits") String mostSigBits,
@PathParam("leastSigBits") String leastSigBits) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionInBufferStats(asyncResponse, authoritative,
- Long.parseLong(mostSigBits), Long.parseLong(leastSigBits));
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits),
+ Long.parseLong(leastSigBits))
+ .thenAccept(stat -> asyncResponse.resume(stat))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -106,9 +117,19 @@ public class Transactions extends TransactionsBase {
@PathParam("mostSigBits") String mostSigBits,
@PathParam("leastSigBits") String leastSigBits,
@PathParam("subName") String subName) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionInPendingAckStats(asyncResponse, authoritative, Long.parseLong(mostSigBits),
- Long.parseLong(leastSigBits), subName);
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits),
+ Long.parseLong(leastSigBits), subName)
+ .thenAccept(stat -> asyncResponse.resume(stat))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -127,8 +148,18 @@ public class Transactions extends TransactionsBase {
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetTransactionBufferStats(asyncResponse, authoritative);
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetTransactionBufferStats(authoritative)
+ .thenAccept(stat -> asyncResponse.resume(stat))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -148,8 +179,18 @@ public class Transactions extends TransactionsBase {
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName) {
- validateTopicName(tenant, namespace, encodedTopic);
- internalGetPendingAckStats(asyncResponse, authoritative, subName);
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetPendingAckStats(authoritative, subName)
+ .thenAccept(stats -> asyncResponse.resume(stats))
+ .exceptionally(ex -> {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
@GET
@@ -168,6 +209,7 @@ public class Transactions extends TransactionsBase {
@DefaultValue("false") boolean authoritative,
@PathParam("mostSigBits") String mostSigBits,
@PathParam("leastSigBits") String leastSigBits) {
+ checkTransactionCoordinatorEnabled();
internalGetTransactionMetadata(asyncResponse, authoritative, Integer.parseInt(mostSigBits),
Long.parseLong(leastSigBits));
}
@@ -188,6 +230,7 @@ public class Transactions extends TransactionsBase {
@DefaultValue("false") boolean authoritative,
@PathParam("timeout") String timeout,
@QueryParam("coordinatorId") Integer coordinatorId) {
+ checkTransactionCoordinatorEnabled();
internalGetSlowTransactions(asyncResponse, authoritative, Long.parseLong(timeout), coordinatorId);
}
@@ -205,6 +248,7 @@ public class Transactions extends TransactionsBase {
@DefaultValue("false") boolean authoritative,
@PathParam("coordinatorId") String coordinatorId,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
+ checkTransactionCoordinatorEnabled();
internalGetCoordinatorInternalStats(asyncResponse, authoritative, metadata, Integer.parseInt(coordinatorId));
}
@@ -229,8 +273,9 @@ public class Transactions extends TransactionsBase {
@PathParam("subName") String subName,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
try {
+ checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
- internalGetPendingAckInternalStats(authoritative, topicName, subName, metadata)
+ internalGetPendingAckInternalStats(authoritative, subName, metadata)
.thenAccept(stats -> asyncResponse.resume(stats))
.exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 920ad61be4e..fa6e56724fe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.v3;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -121,6 +122,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
initTransaction(2);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
+ try {
+ admin.transactions()
+ .getTransactionInBufferStatsAsync(new TxnID(1, 1), topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getTransactionInBufferStatsAsync(new TxnID(1, 1), topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
@@ -146,6 +166,27 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
initTransaction(2);
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
final String subName = "test";
+ try {
+ admin.transactions()
+ .getTransactionInPendingAckStatsAsync(new TxnID(1,
+ 2), topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getTransactionInPendingAckStatsAsync(new TxnID(1,
+ 2), topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
@@ -252,8 +293,26 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
final String topic = "persistent://public/default/testGetTransactionBufferStats";
final String subName1 = "test1";
final String subName2 = "test2";
+ try {
+ admin.transactions()
+ .getTransactionBufferStatsAsync(topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getTransactionBufferStatsAsync(topic).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
-
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
@@ -289,6 +348,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
initTransaction(2);
final String topic = "persistent://public/default/testGetPendingAckStats";
final String subName = "test1";
+ try {
+ admin.transactions()
+ .getPendingAckStatsAsync(topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getPendingAckStatsAsync(topic, subName).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
@@ -429,6 +507,18 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}
+ @Test(timeOut = 20000)
+ public void testTransactionNotEnabled() throws Exception {
+ stopBroker();
+ conf.setTransactionCoordinatorEnabled(false);
+ super.internalSetup();
+ try {
+ admin.transactions().getCoordinatorInternalStats(1, false);
+ } catch (PulsarAdminException ex) {
+ assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
+ }
+ }
+
private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");