You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/06/29 10:23:14 UTC
[pulsar] branch master updated: [improve][txn] Add a admin tool to check message pending ack stats (#15682)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f230d15ffcd [improve][txn] Add a admin tool to check message pending ack stats (#15682)
f230d15ffcd is described below
commit f230d15ffcd5f74cca13bd23b35ace784d6f8ce6
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Wed Jun 29 18:23:01 2022 +0800
[improve][txn] Add a admin tool to check message pending ack stats (#15682)
* [improve][txn] Add a admin tool to check message pending ack stats
### Motivation
When a message is acknowledged by a transaction, it is in pending ack stats until the end of the transaction.
But we have no way to judge whether the message is in the pendingAck state on the client-side.
### Modification
Add an admin tool to check message pending ack stats.
---
.../pulsar/broker/admin/impl/TransactionsBase.java | 17 +++
.../pulsar/broker/admin/v3/Transactions.java | 42 ++++++++
.../service/persistent/PersistentSubscription.java | 5 +
.../transaction/pendingack/PendingAckHandle.java | 9 ++
.../pendingack/impl/PendingAckHandleDisabled.java | 6 ++
.../pendingack/impl/PendingAckHandleImpl.java | 35 ++++++
.../broker/admin/v3/AdminApiTransactionTest.java | 117 +++++++++++++++++++++
.../apache/pulsar/client/admin/Transactions.java | 27 +++++
.../common/stats/PositionInPendingAckStats.java | 41 ++++++++
.../client/admin/internal/TransactionsImpl.java | 37 +++++++
10 files changed, 336 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
index cb69d8c3de3..b2137d74338 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
@@ -34,6 +34,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
@@ -55,6 +56,7 @@ import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
@@ -447,4 +449,19 @@ public abstract class TransactionsBase extends AdminResource {
return new PartitionedTopicMetadata(replicas);
}));
}
+
+ protected CompletableFuture<PositionInPendingAckStats> internalGetPositionStatsPendingAckStats(
+ boolean authoritative, String subName, PositionImpl position, Integer batchIndex) {
+ CompletableFuture<PositionInPendingAckStats> completableFuture = new CompletableFuture<>();
+ getExistingPersistentTopicAsync(authoritative)
+ .thenAccept(topic -> {
+ PositionInPendingAckStats result = topic.getSubscription(subName)
+ .checkPositionInPendingAckState(position, batchIndex);
+ completableFuture.complete(result);
+ }).exceptionally(ex -> {
+ completableFuture.completeExceptionally(ex);
+ return null;
+ });
+ return completableFuture;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index fafe5893d15..bffdf5a252c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -39,6 +39,7 @@ import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
@@ -342,4 +343,45 @@ public class Transactions extends TransactionsBase {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
+
+ @GET
+ @Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
+ @ApiOperation(value = "Get position stats in pending ack.")
+ @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
+ + "or subscription name doesn't exist"),
+ @ApiResponse(code = 503, message = "This Broker is not configured "
+ + "with transactionCoordinatorEnabled=true."),
+ @ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
+ @ApiResponse(code = 405, message = "Pending ack handle don't use managedLedger!"),
+ @ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
+ @ApiResponse(code = 409, message = "Concurrent modification")})
+ public void getPositionStatsInPendingAck(@Suspended final AsyncResponse asyncResponse,
+ @QueryParam("authoritative")
+ @DefaultValue("false") boolean authoritative,
+ @PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace,
+ @PathParam("topic") @Encoded String encodedTopic,
+ @PathParam("subName") String subName,
+ @PathParam("ledgerId") Long ledgerId,
+ @PathParam("entryId") Long entryId,
+ @QueryParam("batchIndex") Integer batchIndex) {
+ try {
+ checkTransactionCoordinatorEnabled();
+ validateTopicName(tenant, namespace, encodedTopic);
+ PositionImpl position = new PositionImpl(ledgerId, entryId);
+ internalGetPositionStatsPendingAckStats(authoritative, subName, position, batchIndex)
+ .thenAccept(asyncResponse::resume)
+ .exceptionally(ex -> {
+ log.warn("{} Failed to check position [{}] stats for topic [{}], subscription [{}]",
+ clientAppId(), position, topicName, subName, ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ } catch (Exception ex) {
+ log.warn("Failed to get position stats in pending ack", ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ }
+ }
+
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 2dd503616fa..5c4cd624be3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -77,6 +77,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1189,5 +1190,9 @@ public class PersistentSubscription implements Subscription {
return this.pendingAckHandle.checkIfPendingAckStoreInit();
}
+ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+ return pendingAckHandle.checkPositionInPendingAckState(position, batchIndex);
+ }
+
private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
index cfe0faabaa6..d8e16dd4015 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
/**
@@ -157,4 +158,12 @@ public interface PendingAckHandle {
* @return if the PendingAckStore is init.
*/
boolean checkIfPendingAckStoreInit();
+
+ /**
+ * Get the stats of this message position is in pending ack.
+ * @param position message position.
+ * @param batchIndex the batch index of ths position.
+ * @return the stats of the message position.
+ */
+ PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex);
}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
index 1ffee868655..e34628da887 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.broker.transaction.pendingack.PendingAckHandle;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
/**
@@ -100,4 +101,9 @@ public class PendingAckHandleDisabled implements PendingAckHandle {
public boolean checkIfPendingAckStoreInit() {
return false;
}
+
+ @Override
+ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+ return null;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index f6990882701..ab070d8a942 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
@@ -967,6 +968,40 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
}
}
+ @Override
+ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex) {
+ if (!state.equals(State.Ready)) {
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAckNotReady);
+ }
+ if (persistentSubscription.getCursor().getPersistentMarkDeletedPosition() != null && position.compareTo(
+ (PositionImpl) persistentSubscription.getCursor().getPersistentMarkDeletedPosition()) <= 0) {
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.MarkDelete);
+ } else if (individualAckPositions == null) {
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+ }
+ MutablePair<PositionImpl, Integer> positionIntegerMutablePair = individualAckPositions.get(position);
+ if (positionIntegerMutablePair != null) {
+ if (batchIndex == null) {
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+ } else {
+ if (batchIndex >= positionIntegerMutablePair.right) {
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.InvalidPosition);
+ }
+ BitSetRecyclable bitSetRecyclable = BitSetRecyclable
+ .valueOf(positionIntegerMutablePair.left.getAckSet());
+ if (bitSetRecyclable.get(batchIndex)) {
+ bitSetRecyclable.recycle();
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+ } else {
+ bitSetRecyclable.recycle();
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.PendingAck);
+ }
+ }
+ } else {
+ return new PositionInPendingAckStats(PositionInPendingAckStats.State.NotInPendingAck);
+ }
+ }
+
@Override
public boolean checkIfPendingAckStoreInit() {
return this.pendingAckStoreFuture != null && this.pendingAckStoreFuture.isDone();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 2b371be571d..620c4607836 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -19,7 +19,11 @@
package org.apache.pulsar.broker.admin.v3;
import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -30,6 +34,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
@@ -51,6 +56,7 @@ import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.awaitility.Awaitility;
@@ -613,6 +619,117 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
}
}
+
+ @Test
+ public void testCheckPositionInPendingAckState() throws Exception {
+ String topic = "persistent://public/default/test";
+ String subName = "sub";
+ initTransaction(1);
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build()
+ .get();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .sendTimeout(5, TimeUnit.SECONDS)
+ .enableBatching(false)
+ .topic(topic)
+ .create();
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName(subName)
+ .subscribe();
+
+ producer.newMessage().send();
+
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
+
+ PositionInPendingAckStats result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+ messageId.getLedgerId(), messageId.getEntryId(), null);
+ assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);
+
+ consumer.acknowledgeAsync(messageId, transaction).get();
+ result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+ messageId.getLedgerId(), messageId.getEntryId(), null);
+ assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
+ transaction.commit().get();
+ result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+ messageId.getLedgerId(), messageId.getEntryId(), null);
+ assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
+ }
+
+ @Test
+ public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
+ String topic = "persistent://public/default/test";
+ String subscriptionName = "my-subscription-batch";
+ initTransaction(1);
+ pulsar.getBrokerService()
+ .getManagedLedgerConfig(TopicName.get(topic)).get()
+ .setDeletionAtBatchIndexLevelEnabled(true);
+
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .enableBatching(true)
+ .batchingMaxMessages(3)
+ // set batch max publish delay big enough to make sure entry has 3 messages
+ .batchingMaxPublishDelay(3, TimeUnit.SECONDS)
+ .topic(topic).create();
+
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .subscriptionName(subscriptionName)
+ .enableBatchIndexAcknowledgment(true)
+ .subscriptionType(SubscriptionType.Shared)
+ .isAckReceiptEnabled(true)
+ .topic(topic)
+ .subscribe();
+
+ List<MessageId> messageIds = new ArrayList<>();
+ List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
+
+ List<String> messages = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String message = "my-message-" + i;
+ messages.add(message);
+ CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
+ futureMessageIds.add(messageIdCompletableFuture);
+ }
+
+ for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
+ MessageId messageId = futureMessageId.get();
+ messageIds.add(messageId);
+ }
+
+ Transaction transaction = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.DAYS)
+ .build()
+ .get();
+
+ Message<String> message1 = consumer.receive();
+ Message<String> message2 = consumer.receive();
+
+ BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId();
+ consumer.acknowledgeAsync(messageId, transaction).get();
+
+ PositionInPendingAckStats positionStatsInPendingAckStats =
+ admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+ messageId.getLedgerId(), messageId.getEntryId(), 1);
+ assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
+
+ positionStatsInPendingAckStats =
+ admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+ messageId.getLedgerId(), messageId.getEntryId(), 2);
+ assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.NotInPendingAck);
+ positionStatsInPendingAckStats =
+ admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+ messageId.getLedgerId(), messageId.getEntryId(), 10);
+ assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.InvalidPosition);
+
+ }
+
private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 87bd0bf36cd..2ad7ae0b10b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
public interface Transactions {
@@ -306,4 +307,30 @@ public interface Transactions {
*/
CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
+ /**
+ * Check whether the position is in pending ack stats.
+ *
+ * @param topic the topic of checking position in pending ack state
+ * @param subName the subscription name of this pending ack
+ * @param ledgerId the ledger id of the message position.
+ * @param entryId the entry id of the message position.
+ * @param batchIndex the batch index of the message position, `null` means not batch message.
+ * @return {@link PositionInPendingAckStats} a state identified whether the position state.
+ */
+ PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId, Long entryId,
+ Integer batchIndex) throws PulsarAdminException;
+
+ /**
+ * Check whether the position is in pending ack stats.
+ *
+ * @param topic the topic of checking position in pending ack state
+ * @param subName the subscription name of this pending ack
+ * @param ledgerId the ledger id of the message position.
+ * @param entryId the entry id of the message position.
+ * @param batchIndex the batch index of the message position, `null` means not batch message.
+ * @return {@link PositionInPendingAckStats} a state identified whether the position state.
+ */
+ CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic, String subName,
+ Long ledgerId, Long entryId,
+ Integer batchIndex);
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/PositionInPendingAckStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/PositionInPendingAckStats.java
new file mode 100644
index 00000000000..4772f8350bf
--- /dev/null
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/PositionInPendingAckStats.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.stats;
+
+import lombok.Data;
+
+@Data
+public class PositionInPendingAckStats {
+
+ public enum State {
+ PendingAck,
+ MarkDelete,
+ NotInPendingAck,
+ PendingAckNotReady,
+ InvalidPosition
+ };
+
+ public PositionInPendingAckStats() {}
+
+ public PositionInPendingAckStats(State state) {
+ this.state = state;
+ }
+
+ public State state;
+}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index 60b9982d300..edf9381123d 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
+import org.apache.pulsar.common.stats.PositionInPendingAckStats;
public class TransactionsImpl extends BaseResource implements Transactions {
private final WebTarget adminV3Transactions;
@@ -357,4 +358,40 @@ public class TransactionsImpl extends BaseResource implements Transactions {
return asyncPostRequest(path, Entity.entity(replicas, MediaType.APPLICATION_JSON));
}
+ @Override
+ public CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic,
+ String subName,
+ Long ledgerId,
+ Long entryId,
+ Integer batchIndex) {
+ TopicName tn = TopicName.get(topic);
+ WebTarget path = adminV3Transactions.path("pendingAckStats");
+ path = path.path(tn.getRestPath(false));
+ path = path.path(subName);
+ path = path.path(ledgerId.toString());
+ path = path.path(entryId.toString());
+ path = path.queryParam("batchIndex", batchIndex);
+ final CompletableFuture<PositionInPendingAckStats> future = new CompletableFuture<>();
+ asyncGetRequest(path,
+ new InvocationCallback<PositionInPendingAckStats>() {
+ @Override
+ public void completed(PositionInPendingAckStats stats) {
+ future.complete(stats);
+ }
+
+ @Override
+ public void failed(Throwable throwable) {
+ future.completeExceptionally(getApiException(throwable.getCause()));
+ }
+ });
+ return future;
+ }
+
+
+ @Override
+ public PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId,
+ Long entryId, Integer batchIndex)
+ throws PulsarAdminException {
+ return sync(() -> checkPositionInPendingAckStateAsync(topic, subName, ledgerId, entryId, batchIndex));
+ }
}