You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/04/19 09:51:10 UTC
[pulsar] 08/26: [fix][broker] Fix getPendingAckInternalStats redirect issue. (#14876)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 59a1360c06f68390f5e65f6d0708a852e3523d23
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sun Apr 3 03:27:20 2022 +0800
[fix][broker] Fix getPendingAckInternalStats redirect issue. (#14876)
(cherry picked from commit 417d1e50957bee1b0f88b8172d7831af496ed001)
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 93 ++++++++--------------
.../pulsar/broker/admin/v3/Transactions.java | 33 +++++++-
.../broker/admin/v3/AdminApiTransactionTest.java | 22 +++++
3 files changed, 82 insertions(+), 66 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index 504ce92de45..308f18e8bc3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -38,9 +38,6 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
-import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
-import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
-import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionNotFoundException;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
@@ -508,69 +505,41 @@ public abstract class TransactionsBase extends AdminResource {
}
}
- protected void internalGetPendingAckInternalStats(AsyncResponse asyncResponse, boolean authoritative,
- TopicName topicName, String subName, boolean metadata) {
- try {
- if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
- validateTopicOwnership(topicName, authoritative);
- CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
- .getTopics().get(topicName.toString());
- if (topicFuture != null) {
- topicFuture.whenComplete((optionalTopic, e) -> {
-
- if (e != null) {
- asyncResponse.resume(new RestException(e));
- return;
- }
+ protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats(
+ boolean authoritative, TopicName topicName, String subName, boolean metadata) {
+ if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+ return FutureUtil.failedFuture(new RestException(SERVICE_UNAVAILABLE,
+ "This Broker is not configured with transactionCoordinatorEnabled=true."));
+ }
+ return validateTopicOwnershipAsync(topicName, authoritative)
+ .thenCompose(__ -> {
+ CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
+ .getTopics().get(topicName.toString());
+ if (topicFuture == null) {
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
+ }
+ return topicFuture.thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent()) {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT,
- "Topic is not owned by this broker!"));
- return;
- }
- Topic topicObject = optionalTopic.get();
- if (topicObject instanceof PersistentTopic) {
- try {
- ManagedLedger managedLedger =
- ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName).get();
- TransactionPendingAckInternalStats stats =
- new TransactionPendingAckInternalStats();
- TransactionLogStats pendingAckLogStats = new TransactionLogStats();
- pendingAckLogStats.managedLedgerName = managedLedger.getName();
- pendingAckLogStats.managedLedgerInternalStats =
- managedLedger.getManagedLedgerInternalStats(metadata).get();
- stats.pendingAckLogStats = pendingAckLogStats;
- asyncResponse.resume(stats);
- } catch (Exception exception) {
- if (exception instanceof ExecutionException) {
- if (exception.getCause() instanceof ServiceUnitNotReadyException) {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- exception.getCause()));
- return;
- } else if (exception.getCause() instanceof NotAllowedException) {
- asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
- exception.getCause()));
- return;
- } else if (exception.getCause() instanceof SubscriptionNotFoundException) {
- asyncResponse.resume(new RestException(NOT_FOUND, exception.getCause()));
- return;
- }
- }
- asyncResponse.resume(new RestException(exception));
- }
+ return FutureUtil.failedFuture(new RestException(NOT_FOUND, "Topic not found"));
} else {
- asyncResponse.resume(new RestException(BAD_REQUEST, "Topic is not a persistent topic!"));
+ Topic topicObject = optionalTopic.get();
+ return ((PersistentTopic) topicObject).getPendingAckManagedLedger(subName)
+ .thenCompose(managedLedger -> managedLedger.getManagedLedgerInternalStats(metadata)
+ .thenApply(internalStats -> {
+ TransactionLogStats pendingAckLogStats = new TransactionLogStats();
+ pendingAckLogStats.managedLedgerName = managedLedger.getName();
+ pendingAckLogStats.managedLedgerInternalStats = internalStats;
+ return pendingAckLogStats;
+ })
+ .thenApply(pendingAckLogStats -> {
+ TransactionPendingAckInternalStats stats =
+ new TransactionPendingAckInternalStats();
+ stats.pendingAckLogStats = pendingAckLogStats;
+ return stats;
+ }));
}
});
- } else {
- asyncResponse.resume(new RestException(TEMPORARY_REDIRECT, "Topic is not owned by this broker!"));
- }
- } else {
- asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE,
- "This Broker is not configured with transactionCoordinatorEnabled=true."));
- }
- } catch (Exception e) {
- asyncResponse.resume(new RestException(e.getCause()));
- }
+ });
}
protected void validateTopicName(String property, String namespace, String encodedTopic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index 94411f6d16d..bbd79036ecf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.admin.v3;
+import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
+import static javax.ws.rs.core.Response.Status.NOT_FOUND;
+import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -33,14 +36,17 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.util.FutureUtil;
@Path("/transactions")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Api(value = "/transactions", description = "Transactions admin apis", tags = "transactions")
+@Slf4j
public class Transactions extends TransactionsBase {
@GET
@@ -222,7 +228,26 @@ public class Transactions extends TransactionsBase {
@PathParam("topic") @Encoded String encodedTopic,
@PathParam("subName") String subName,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
- internalGetPendingAckInternalStats(asyncResponse, authoritative,
- TopicName.get(TopicDomain.persistent.value(), tenant, namespace, encodedTopic), subName, metadata);
+ try {
+ validateTopicName(tenant, namespace, encodedTopic);
+ internalGetPendingAckInternalStats(authoritative, topicName, subName, metadata)
+ .thenAccept(stats -> asyncResponse.resume(stats))
+ .exceptionally(ex -> {
+ Throwable cause = FutureUtil.unwrapCompletionException(ex);
+ log.error("[{}] Failed to get pending ack internal stats {}", clientAppId(), topicName, cause);
+ if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
+ asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
+ } else if (cause instanceof BrokerServiceException.NotAllowedException) {
+ asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause));
+ } else if (cause instanceof BrokerServiceException.SubscriptionNotFoundException) {
+ asyncResponse.resume(new RestException(NOT_FOUND, cause));
+ } else {
+ asyncResponse.resume(new RestException(cause));
+ }
+ return null;
+ });
+ } catch (Exception ex) {
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 93a2d0e188e..1f796553bdc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -56,12 +57,14 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
@@ -374,6 +377,25 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetPendingAckInternalStats";
final String subName = "test";
+ try {
+ admin.transactions()
+ .getPendingAckInternalStatsAsync(topic, subName, true).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
+ try {
+ pulsar.getBrokerService().getTopic(topic, false);
+ admin.transactions()
+ .getPendingAckInternalStatsAsync(topic, subName, true).get();
+ fail("Should failed here");
+ } catch (ExecutionException ex) {
+ assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
+ PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
+ assertEquals(cause.getMessage(), "Topic not found");
+ }
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)