You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/06/29 10:23:14 UTC

[pulsar] branch master updated: [improve][txn] Add a admin tool to check message pending ack stats (#15682)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f230d15ffcd [improve][txn] Add a admin tool to check message pending ack stats (#15682)
f230d15ffcd is described below

commit f230d15ffcd5f74cca13bd23b35ace784d6f8ce6
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Jun 29 18:23:01 2022 +0800

    [improve][txn] Add a admin tool to check message pending ack stats (#15682)
    
    * [improve][txn] Add a admin tool to check message pending ack stats
    ### Motivation
    When a message is acknowledged by a transaction, it is in pending ack stats until the end of the transaction.
    But we have no way to judge whether the message is in the pendingAck state on the client-side.
    ### Modification
     Add an admin tool to check message pending ack stats.
---
 .../pulsar/broker/admin/impl/TransactionsBase.java |  17 +++
 .../pulsar/broker/admin/v3/Transactions.java       |  42 ++++++++
 .../service/persistent/PersistentSubscription.java |   5 +
 .../transaction/pendingack/PendingAckHandle.java   |   9 ++
 .../pendingack/impl/PendingAckHandleDisabled.java  |   6 ++
 .../pendingack/impl/PendingAckHandleImpl.java      |  35 ++++++
 .../broker/admin/v3/AdminApiTransactionTest.java   | 117 +++++++++++++++++++++
 .../apache/pulsar/client/admin/Transactions.java   |  27 +++++
 .../common/stats/PositionInPendingAckStats.java    |  41 ++++++++
 .../client/admin/internal/TransactionsImpl.java    |  37 +++++++
 10 files changed, 336 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index cb69d8c3de3..b2137d74338 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -34,6 +34,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.Topic;
@@ -55,6 +56,7 @@ import org.apache.pulsar.common.policies.data.TransactionLogStats;
 import org.apache.pulsar.common.policies.data.TransactionMetadata;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -447,4 +449,19 @@ public abstract class TransactionsBase extends AdminResource {
                             return new PartitionedTopicMetadata(replicas);
                         }));
     }
+
+    protected CompletableFuture<PositionInPendingAckStats> internalGetPositionStatsPendingAckStats(
+            boolean authoritative, String subName, PositionImpl position, Integer batchIndex) {
+        CompletableFuture<PositionInPendingAckStats> completableFuture = new CompletableFuture<>();
+        getExistingPersistentTopicAsync(authoritative)
+                .thenAccept(topic -> {
+                    PositionInPendingAckStats result = topic.getSubscription(subName)
+                    .checkPositionInPendingAckState(position, batchIndex);
+                    completableFuture.complete(result);
+                }).exceptionally(ex -> {
+                    completableFuture.completeExceptionally(ex);
+                    return null;
+        });
+        return completableFuture;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index fafe5893d15..bffdf5a252c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -39,6 +39,7 @@ import javax.ws.rs.container.Suspended;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.admin.impl.TransactionsBase;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.RestException;
@@ -342,4 +343,45 @@ public class Transactions extends TransactionsBase {
             resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
+
+    @GET
+    @Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
+    @ApiOperation(value = "Get position stats in pending ack.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+                    + "or subscription name doesn't exist"),
+            @ApiResponse(code = 503, message = "This Broker is not configured "
+                    + "with transactionCoordinatorEnabled=true."),
+            @ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
+            @ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
+            @ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
+            @ApiResponse(code = 409, message = "Concurrent modification")})
+    public void getPositionStatsInPendingAck(@Suspended final AsyncResponse asyncResponse,
+                                             @QueryParam("authoritative")
+                                             @DefaultValue("false") boolean authoritative,
+                                             @PathParam("tenant") String tenant,
+                                             @PathParam("namespace") String namespace,
+                                             @PathParam("topic") @Encoded String encodedTopic,
+                                             @PathParam("subName") String subName,
+                                             @PathParam("ledgerId") Long ledgerId,
+                                             @PathParam("entryId") Long entryId,
+                                             @QueryParam("batchIndex") Integer batchIndex) {
+        try {
+            checkTransactionCoordinatorEnabled();
+            validateTopicName(tenant, namespace, encodedTopic);
+            PositionImpl position = new PositionImpl(ledgerId, entryId);
+            internalGetPositionStatsPendingAckStats(authoritative, subName, position, batchIndex)
+                    .thenAccept(asyncResponse::resume)
+                    .exceptionally(ex -> {
+                        log.warn("{} Failed to check position [{}] stats for topic [{}], subscription [{}]",
+                                clientAppId(), position, topicName, subName, ex);
+                        resumeAsyncResponseExceptionally(asyncResponse, ex);
+                        return null;
+                    });
+        } catch (Exception ex) {
+            log.warn("Failed to get position stats in pending ack", ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+        }
+    }
+
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2dd503616fa..5c4cd624be3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
 import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1189,5 +1190,9 @@ public class PersistentSubscription implements Subscription {
         return this.pendingAckHandle.checkIfPendingAckStoreInit();
     }
 
+    public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+        return pendingAckHandle.checkPositionInPendingAckState(position, batchIndex);
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index cfe0faabaa6..d8e16dd4015 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 
 /**
@@ -157,4 +158,12 @@ public interface PendingAckHandle {
      * @return if the PendingAckStore is init.
      */
     boolean checkIfPendingAckStoreInit();
+
+    /**
+     * Get the stats of this message position is in pending ack.
+     * @param position message position.
+     * @param batchIndex the batch index of ths position.
+     * @return the stats of the message position.
+     */
+    PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex);
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 1ffee868655..e34628da887 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.FutureUtil;
 
 /**
@@ -100,4 +101,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
     public boolean checkIfPendingAckStoreInit() {
         return false;
     }
+
+    @Override
+    public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+        return null;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index f6990882701..ab070d8a942 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.CommandAck.AckType;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
@@ -967,6 +968,40 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         }
     }
 
+    @Override
+    public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+        if (!state.equals(State.Ready)) {
+            return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAckNotReady);
+        }
+        if (persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null && position.compareTo(
+                        (PositionImpl) persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+            return new PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+        } else if (individualAckPositions == null) {
+            return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+        }
+        MutablePair<PositionImpl, Integer> positionIntegerMutablePair = individualAckPositions.get(position);
+        if (positionIntegerMutablePair != null) {
+            if (batchIndex == null) {
+                return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+            } else {
+                if (batchIndex >= positionIntegerMutablePair.right) {
+                    return new PositionInPendingAckStats(PositionInPendingAckStats.State.InvalidPosition);
+                }
+                BitSetRecyclable bitSetRecyclable = BitSetRecyclable
+                        .valueOf(positionIntegerMutablePair.left.getAckSet());
+                if (bitSetRecyclable.get(batchIndex)) {
+                    bitSetRecyclable.recycle();
+                    return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+                } else {
+                    bitSetRecyclable.recycle();
+                    return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+                }
+            }
+        } else {
+            return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+        }
+    }
+
     @Override
     public boolean checkIfPendingAckStoreInit() {
         return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 2b371be571d..620c4607836 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,7 +19,11 @@
 package org.apache.pulsar.broker.admin.v3;
 
 import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.http.HttpStatus;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -30,6 +34,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.transaction.Transaction;
 import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -51,6 +56,7 @@ import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionMetadata;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
 import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
 import org.awaitility.Awaitility;
@@ -613,6 +619,117 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+
+    @Test
+    public void testCheckPositionInPendingAckState() throws Exception {
+         String topic = "persistent://public/default/test";
+         String subName = "sub";
+         initTransaction(1);
+         Transaction transaction = pulsarClient.newTransaction()
+                 .withTransactionTimeout(5, TimeUnit.SECONDS)
+                 .build()
+                 .get();
+
+         @Cleanup
+         Producer<byte[]> producer = pulsarClient.newProducer()
+                 .sendTimeout(5, TimeUnit.SECONDS)
+                 .enableBatching(false)
+                 .topic(topic)
+                 .create();
+         @Cleanup
+         Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                 .topic(topic)
+                 .subscriptionName(subName)
+                 .subscribe();
+
+         producer.newMessage().send();
+
+         Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+         MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
+
+         PositionInPendingAckStats result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+                 messageId.getLedgerId(), messageId.getEntryId(), null);
+         assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);
+
+         consumer.acknowledgeAsync(messageId, transaction).get();
+         result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+                messageId.getLedgerId(), messageId.getEntryId(), null);
+         assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
+         transaction.commit().get();
+         result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+                 messageId.getLedgerId(), messageId.getEntryId(), null);
+         assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
+    }
+
+    @Test
+    public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
+        String topic = "persistent://public/default/test";
+        String subscriptionName = "my-subscription-batch";
+        initTransaction(1);
+        pulsar.getBrokerService()
+                .getManagedLedgerConfig(TopicName.get(topic)).get()
+                .setDeletionAtBatchIndexLevelEnabled(true);
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .enableBatching(true)
+                .batchingMaxMessages(3)
+                // set batch max publish delay big enough to make sure entry has 3 messages
+                .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+                .topic(topic).create();
+
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionName(subscriptionName)
+                .enableBatchIndexAcknowledgment(true)
+                .subscriptionType(SubscriptionType.Shared)
+                .isAckReceiptEnabled(true)
+                .topic(topic)
+                .subscribe();
+
+        List<MessageId> messageIds = new ArrayList<>();
+        List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+        List<String> messages = new ArrayList<>();
+        for (int i = 0; i < 3; i++) {
+            String message = "my-message-" + i;
+            messages.add(message);
+            CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+            futureMessageIds.add(messageIdCompletableFuture);
+        }
+
+        for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+            MessageId messageId = futureMessageId.get();
+            messageIds.add(messageId);
+        }
+
+        Transaction transaction = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.DAYS)
+                .build()
+                .get();
+
+        Message<String> message1 = consumer.receive();
+        Message<String> message2 = consumer.receive();
+
+        BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId();
+        consumer.acknowledgeAsync(messageId, transaction).get();
+
+        PositionInPendingAckStats positionStatsInPendingAckStats =
+                admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+                messageId.getLedgerId(), messageId.getEntryId(), 1);
+        assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
+        
+        positionStatsInPendingAckStats =
+                admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+                        messageId.getLedgerId(), messageId.getEntryId(), 2);
+        assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.NotInPendingAck);
+        positionStatsInPendingAckStats =
+                admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+                        messageId.getLedgerId(), messageId.getEntryId(), 10);
+        assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.InvalidPosition);
+
+    }
+
     private static void verifyCoordinatorStats(String state,
                                                long sequenceId, long lowWaterMark) {
         assertEquals(state, "Ready");
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 87bd0bf36cd..2ad7ae0b10b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionMetadata;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 
 public interface Transactions {
 
@@ -306,4 +307,30 @@ public interface Transactions {
      */
     CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
 
+    /**
+     * Check whether the position is in pending ack stats.
+     *
+     * @param topic the topic of checking position in pending ack state
+     * @param subName the subscription name of this pending ack
+     * @param ledgerId the ledger id of the message position.
+     * @param entryId the entry id of the message position.
+     * @param batchIndex the batch index of the message position, `null` means not batch message.
+     * @return {@link PositionInPendingAckStats} a state identified whether the position state.
+     */
+    PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId, Long entryId,
+                                                             Integer batchIndex) throws PulsarAdminException;
+
+    /**
+     * Check whether the position is in pending ack stats.
+     *
+     * @param topic the topic of checking position in pending ack state
+     * @param subName the subscription name of this pending ack
+     * @param ledgerId the ledger id of the message position.
+     * @param entryId the entry id of the message position.
+     * @param batchIndex the batch index of the message position, `null` means not batch message.
+     * @return {@link PositionInPendingAckStats} a state identified whether the position state.
+     */
+    CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic, String subName,
+                                                                                     Long ledgerId, Long entryId,
+                                                                                     Integer batchIndex);
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/PositionInPendingAckStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/PositionInPendingAckStats.java
new file mode 100644
index 00000000000..4772f8350bf
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/PositionInPendingAckStats.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.stats;
+
+import lombok.Data;
+
+@Data
+public class PositionInPendingAckStats {
+
+    public enum State {
+        PendingAck,
+        MarkDelete,
+        NotInPendingAck,
+        PendingAckNotReady,
+        InvalidPosition
+    };
+
+    public PositionInPendingAckStats() {}
+
+    public PositionInPendingAckStats(State state) {
+        this.state = state;
+    }
+
+    public State state;
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 60b9982d300..edf9381123d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionMetadata;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 
 public class TransactionsImpl extends BaseResource implements Transactions {
     private final WebTarget adminV3Transactions;
@@ -357,4 +358,40 @@ public class TransactionsImpl extends BaseResource implements Transactions {
         return asyncPostRequest(path, Entity.entity(replicas, MediaType.APPLICATION_JSON));
     }
 
+    @Override
+    public CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic,
+                                                                                            String subName,
+                                                                                            Long ledgerId,
+                                                                                            Long entryId,
+                                                                                            Integer batchIndex) {
+        TopicName tn = TopicName.get(topic);
+        WebTarget path = adminV3Transactions.path("pendingAckStats");
+        path = path.path(tn.getRestPath(false));
+        path = path.path(subName);
+        path = path.path(ledgerId.toString());
+        path = path.path(entryId.toString());
+        path = path.queryParam("batchIndex", batchIndex);
+        final CompletableFuture<PositionInPendingAckStats> future = new CompletableFuture<>();
+        asyncGetRequest(path,
+                new InvocationCallback<PositionInPendingAckStats>() {
+                    @Override
+                    public void completed(PositionInPendingAckStats stats) {
+                        future.complete(stats);
+                    }
+
+                    @Override
+                    public void failed(Throwable throwable) {
+                        future.completeExceptionally(getApiException(throwable.getCause()));
+                    }
+                });
+        return future;
+    }
+
+
+    @Override
+    public PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId,
+                                                                    Long entryId, Integer batchIndex)
+            throws PulsarAdminException {
+        return sync(() -> checkPositionInPendingAckStateAsync(topic, subName, ledgerId, entryId, batchIndex));
+    }
 }