You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 09:51:10 UTC

[pulsar] 08/26: [fix][broker] Fix getPendingAckInternalStats redirect issue. (#14876)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 59a1360c06f68390f5e65f6d0708a852e3523d23
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Apr 3 03:27:20 2022 +0800

    [fix][broker] Fix getPendingAckInternalStats redirect issue. (#14876)
    
    (cherry picked from commit 417d1e50957bee1b0f88b8172d7831af496ed001)
---
 .../pulsar/broker/admin/impl/TransactionsBase.java | 93 ++++++++--------------
 .../pulsar/broker/admin/v3/Transactions.java       | 33 +++++++-
 .../broker/admin/v3/AdminApiTransactionTest.java   | 22 +++++
 3 files changed, 82 insertions(+), 66 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 504ce92de45..308f18e8bc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -38,9 +38,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
-import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
-import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
@@ -508,69 +505,41 @@ public abstract class TransactionsBase extends AdminResource {
         }
     }
 
-    protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, boolean authoritative,
-                                                      TopicName topicName, String subName, boolean metadata) {
-        try {
-            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
-                validateTopicOwnership(topicName, authoritative);
-                CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
-                        .getTopics().get(topicName.toString());
-                if (topicFuture != null) {
-                    topicFuture.whenComplete((optionalTopic, e) -> {
-
-                        if (e != null) {
-                            asyncResponse.resume(new RestException(e));
-                            return;
-                        }
+    protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats(
+            boolean authoritative, TopicName topicName, String subName, boolean metadata) {
+        if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+            return FutureUtil.failedFuture(new RestException(SERVICE_UNAVAILABLE,
+                    "This Broker is not configured with transactionCoordinatorEnabled=true."));
+        }
+        return validateTopicOwnershipAsync(topicName, authoritative)
+                .thenCompose(__ -> {
+                    CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
+                            .getTopics().get(topicName.toString());
+                    if (topicFuture == null) {
+                        return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+                    }
+                    return topicFuture.thenCompose(optionalTopic -> {
                         if (!optionalTopic.isPresent()) {
-                            asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
-                                    "Topic is not owned by this broker!"));
-                            return;
-                        }
-                        Topic topicObject = optionalTopic.get();
-                        if (topicObject instanceof PersistentTopic) {
-                            try {
-                                ManagedLedger managedLedger =
-                                        ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName).get();
-                                TransactionPendingAckInternalStats stats =
-                                        new TransactionPendingAckInternalStats();
-                                TransactionLogStats pendingAckLogStats = new TransactionLogStats();
-                                pendingAckLogStats.managedLedgerName = managedLedger.getName();
-                                pendingAckLogStats.managedLedgerInternalStats =
-                                        managedLedger.getManagedLedgerInternalStats(metadata).get();
-                                stats.pendingAckLogStats = pendingAckLogStats;
-                                asyncResponse.resume(stats);
-                            } catch (Exception exception) {
-                                if (exception instanceof ExecutionException) {
-                                    if (exception.getCause() instanceof ServiceUnitNotReadyException) {
-                                        asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                                                exception.getCause()));
-                                        return;
-                                    } else if (exception.getCause() instanceof NotAllowedException) {
-                                        asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
-                                                exception.getCause()));
-                                        return;
-                                    } else if (exception.getCause() instanceof SubscriptionNotFoundException) {
-                                        asyncResponse.resume(new RestException(NOT_FOUND, exception.getCause()));
-                                        return;
-                                    }
-                                }
-                                asyncResponse.resume(new RestException(exception));
-                            }
+                            return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
                         } else {
-                            asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
+                            Topic topicObject = optionalTopic.get();
+                            return ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName)
+                                    .thenCompose(managedLedger -> managedLedger.getManagedLedgerInternalStats(metadata)
+                                            .thenApply(internalStats -> {
+                                                TransactionLogStats pendingAckLogStats = new TransactionLogStats();
+                                                pendingAckLogStats.managedLedgerName = managedLedger.getName();
+                                                pendingAckLogStats.managedLedgerInternalStats = internalStats;
+                                                return pendingAckLogStats;
+                                            })
+                                            .thenApply(pendingAckLogStats -> {
+                                                TransactionPendingAckInternalStats stats =
+                                                        new TransactionPendingAckInternalStats();
+                                                stats.pendingAckLogStats = pendingAckLogStats;
+                                                return stats;
+                                            }));
                         }
                     });
-                } else {
-                    asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
-                }
-            } else {
-                asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
-                        "This Broker is not configured with transactionCoordinatorEnabled=true."));
-            }
-        } catch (Exception e) {
-            asyncResponse.resume(new RestException(e.getCause()));
-        }
+                });
     }
 
     protected void validateTopicName(String property, String namespace, String encodedTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 94411f6d16d..bbd79036ecf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pulsar.broker.admin.v3;
 
+import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
@@ -33,14 +36,17 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.admin.impl.TransactionsBase;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.util.FutureUtil;
 
 @Path("/transactions")
 @Produces(MediaType.APPLICATION_JSON)
 @Consumes(MediaType.APPLICATION_JSON)
 @Api(value = "/transactions", description = "Transactions admin apis", tags = "transactions")
+@Slf4j
 public class Transactions extends TransactionsBase {
 
     @GET
@@ -222,7 +228,26 @@ public class Transactions extends TransactionsBase {
                                            @PathParam("topic") @Encoded String encodedTopic,
                                            @PathParam("subName") String subName,
                                            @QueryParam("metadata") @DefaultValue("false") boolean metadata) {
-        internalGetPendingAckInternalStats(asyncResponse, authoritative,
-                TopicName.get(TopicDomain.persistent.value(), tenant, namespace, encodedTopic), subName, metadata);
+        try {
+            validateTopicName(tenant, namespace, encodedTopic);
+            internalGetPendingAckInternalStats(authoritative, topicName, subName, metadata)
+                    .thenAccept(stats -> asyncResponse.resume(stats))
+                    .exceptionally(ex -> {
+                        Throwable cause = FutureUtil.unwrapCompletionException(ex);
+                        log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause);
+                        if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
+                            asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
+                        } else if (cause instanceof BrokerServiceException.NotAllowedException) {
+                            asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause));
+                        } else if (cause instanceof BrokerServiceException.SubscriptionNotFoundException) {
+                            asyncResponse.resume(new RestException(NOT_FOUND, cause));
+                        } else {
+                            asyncResponse.resume(new RestException(cause));
+                        }
+                        return null;
+                    });
+        } catch (Exception ex) {
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 93a2d0e188e..1f796553bdc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -56,12 +57,14 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
 
@@ -374,6 +377,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         TransactionImpl transaction = (TransactionImpl) getTransaction();
         final String topic = "persistent://public/default/testGetPendingAckInternalStats";
         final String subName = "test";
+        try {
+            admin.transactions()
+                    .getPendingAckInternalStatsAsync(topic, subName, true).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
+        try {
+            pulsar.getBrokerService().getTopic(topic, false);
+            admin.transactions()
+                    .getPendingAckInternalStatsAsync(topic, subName, true).get();
+            fail("Should failed here");
+        } catch (ExecutionException ex) {
+            assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+            PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+            assertEquals(cause.getMessage(), "Topic not found");
+        }
         admin.topics().createNonPartitionedTopic(topic);
         Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
         Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)