You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/04/13 05:28:13 UTC

[pulsar] branch master updated: log transaction rest api error on broker side (#15081)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19120e6b00e log  transaction rest api error on broker side (#15081)
19120e6b00e is described below

commit 19120e6b00e4176b8cf72058ee524eb4d10fc759
Author: gaozhangmin <ga...@qq.com>
AuthorDate: Wed Apr 13 13:28:06 2022 +0800

    log  transaction rest api error on broker side (#15081)
    
    Co-authored-by: gavingaozhangmin <ga...@didiglobal.com>
---
 .../pulsar/broker/admin/v3/Transactions.java       | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 9cb825b9f8e..7b2c99ff9d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -88,8 +88,12 @@ public class Transactions extends TransactionsBase {
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits),
                     Long.parseLong(leastSigBits))
-                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction state in transaction buffer {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -122,8 +126,12 @@ public class Transactions extends TransactionsBase {
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits),
                     Long.parseLong(leastSigBits), subName)
-                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction state in pending ack {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -152,8 +160,12 @@ public class Transactions extends TransactionsBase {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetTransactionBufferStats(authoritative)
-                    .thenAccept(stat -> asyncResponse.resume(stat))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction buffer stats in topic {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -183,8 +195,12 @@ public class Transactions extends TransactionsBase {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetPendingAckStats(authoritative, subName)
-                    .thenAccept(stats -> asyncResponse.resume(stats))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get transaction pending ack stats in topic {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         resumeAsyncResponseExceptionally(asyncResponse, ex);
                         return null;
                     });
@@ -276,10 +292,13 @@ public class Transactions extends TransactionsBase {
             checkTransactionCoordinatorEnabled();
             validateTopicName(tenant, namespace, encodedTopic);
             internalGetPendingAckInternalStats(authoritative, subName, metadata)
-                    .thenAccept(stats -> asyncResponse.resume(stats))
+                    .thenAccept(asyncResponse::resume)
                     .exceptionally(ex -> {
+                        if (!isRedirectException(ex)) {
+                            log.error("[{}] Failed to get pending ack internal stats {}",
+                                    clientAppId(), topicName, ex);
+                        }
                         Throwable cause = FutureUtil.unwrapCompletionException(ex);
-                        log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause);
                         if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                             asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
                         } else if (cause instanceof BrokerServiceException.NotAllowedException) {