You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/04/13 05:28:13 UTC
[pulsar] branch master updated: log transaction rest api error on broker side (#15081)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 19120e6b00e log transaction rest api error on broker side (#15081)
19120e6b00e is described below
commit 19120e6b00e4176b8cf72058ee524eb4d10fc759
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Wed Apr 13 13:28:06 2022 +0800
log transaction rest api error on broker side (#15081)
Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
.../pulsar/broker/admin/v3/Transactions.java | 31 +++++++++++++++++-----
1 file changed, 25 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 9cb825b9f8e..7b2c99ff9d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -88,8 +88,12 @@ public class Transactions extends TransactionsBase {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits))
- .thenAccept(stat -> asyncResponse.resume(stat))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction state in transaction buffer {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -122,8 +126,12 @@ public class Transactions extends TransactionsBase {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits),
Long.parseLong(leastSigBits), subName)
- .thenAccept(stat -> asyncResponse.resume(stat))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction state in pending ack {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -152,8 +160,12 @@ public class Transactions extends TransactionsBase {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative)
- .thenAccept(stat -> asyncResponse.resume(stat))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction buffer stats in topic {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -183,8 +195,12 @@ public class Transactions extends TransactionsBase {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckStats(authoritative, subName)
- .thenAccept(stats -> asyncResponse.resume(stats))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get transaction pending ack stats in topic {}",
+ clientAppId(), topicName, ex);
+ }
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
@@ -276,10 +292,13 @@ public class Transactions extends TransactionsBase {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetPendingAckInternalStats(authoritative, subName, metadata)
- .thenAccept(stats -> asyncResponse.resume(stats))
+ .thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
+ if (!isRedirectException(ex)) {
+ log.error("[{}] Failed to get pending ack internal stats {}",
+ clientAppId(), topicName, ex);
+ }
Throwable cause = FutureUtil.unwrapCompletionException(ex);
- log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause);
if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
} else if (cause instanceof BrokerServiceException.NotAllowedException) {