You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/07/07 08:45:45 UTC
[pulsar] branch master updated: [Improve] [txn] add txn admin cmd and swagger config for transaction admin (#16396)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 318432ed1f1 [Improve] [txn] add txn admin cmd and swagger config for transaction admin (#16396)
318432ed1f1 is described below
commit 318432ed1f16e60a088f3dca1b14891d582726a9
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Jul 7 16:45:37 2022 +0800
[Improve] [txn] add txn admin cmd and swagger config for transaction admin (#16396)
### Motivation & Modifications
1. Added missing admin cmd
2. Add swagger config for transaction admin
3. optimize description and URL
Due to the [PR](https://github.com/apache/pulsar/pull/15682) not being cherry-picked, there are no break changes.
---
pulsar-broker/pom.xml | 17 +++++++
.../pulsar/broker/admin/v3/Transactions.java | 2 +-
.../broker/admin/v3/AdminApiTransactionTest.java | 12 ++---
.../apache/pulsar/client/admin/Transactions.java | 15 +++---
.../client/admin/internal/TransactionsImpl.java | 18 +++----
.../pulsar/admin/cli/PulsarAdminToolTest.java | 4 ++
.../apache/pulsar/admin/cli/CmdTransactions.java | 55 ++++++++++++++++------
7 files changed, 84 insertions(+), 39 deletions(-)
diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 952c10301f3..d2641319e85 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -708,6 +708,23 @@
<swaggerDirectory>${basedir}/target/docs</swaggerDirectory>
<swaggerFileName>swaggerfunctions</swaggerFileName>
</apiSource>
+ <apiSource>
+ <springmvc>false</springmvc>
+ <locations>org.apache.pulsar.broker.admin.v3.Transactions</locations>
+ <schemes>http,https</schemes>
+ <basePath>/admin/v3</basePath>
+ <info>
+ <title>Pulsar Transactions REST API</title>
+ <version>v3</version>
+ <description>This provides the REST API for Pulsar Transactions operations</description>
+ <license>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
+ <name>Apache 2.0</name>
+ </license>
+ </info>
+ <swaggerDirectory>${basedir}/target/docs</swaggerDirectory>
+ <swaggerFileName>swaggertransactions</swaggerFileName>
+ </apiSource>
<apiSource>
<springmvc>false</springmvc>
<locations>org.apache.pulsar.broker.admin.v3.Sources</locations>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index bffdf5a252c..5dcf57f80f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -345,7 +345,7 @@ public class Transactions extends TransactionsBase {
}
@GET
- @Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
+ @Path("/positionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
@ApiOperation(value = "Get position stats in pending ack.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index ddb340109d7..0e9bbc10251 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -703,16 +703,16 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
- PositionInPendingAckStats result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+ PositionInPendingAckStats result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);
consumer.acknowledgeAsync(messageId, transaction).get();
- result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+ result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
transaction.commit().get();
- result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+ result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
}
@@ -771,16 +771,16 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
consumer.acknowledgeAsync(messageId, transaction).get();
PositionInPendingAckStats positionStatsInPendingAckStats =
- admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+ admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 1);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
positionStatsInPendingAckStats =
- admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+ admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 2);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.NotInPendingAck);
positionStatsInPendingAckStats =
- admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+ admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 10);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.InvalidPosition);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 2ad7ae0b10b..3bd5a188dce 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -308,8 +308,7 @@ public interface Transactions {
CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
/**
- * Check whether the position is in pending ack stats.
- *
+ * Get the position stats in transaction pending ack.
* @param topic the topic of checking position in pending ack state
* @param subName the subscription name of this pending ack
* @param ledgerId the ledger id of the message position.
@@ -317,11 +316,11 @@ public interface Transactions {
* @param batchIndex the batch index of the message position, `null` means not batch message.
* @return {@link PositionInPendingAckStats} a state identified whether the position state.
*/
- PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId, Long entryId,
- Integer batchIndex) throws PulsarAdminException;
+ PositionInPendingAckStats getPositionStatsInPendingAck(String topic, String subName, Long ledgerId, Long entryId,
+ Integer batchIndex) throws PulsarAdminException;
/**
- * Check whether the position is in pending ack stats.
+ * Get the position stats in transaction pending ack.
*
* @param topic the topic of checking position in pending ack state
* @param subName the subscription name of this pending ack
@@ -330,7 +329,7 @@ public interface Transactions {
* @param batchIndex the batch index of the message position, `null` means not batch message.
* @return {@link PositionInPendingAckStats} a state identified whether the position state.
*/
- CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic, String subName,
- Long ledgerId, Long entryId,
- Integer batchIndex);
+ CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(String topic, String subName,
+ Long ledgerId, Long entryId,
+ Integer batchIndex);
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index edf9381123d..80f41c3e2d5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -359,13 +359,13 @@ public class TransactionsImpl extends BaseResource implements Transactions {
}
@Override
- public CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic,
- String subName,
- Long ledgerId,
- Long entryId,
- Integer batchIndex) {
+ public CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(String topic,
+ String subName,
+ Long ledgerId,
+ Long entryId,
+ Integer batchIndex) {
TopicName tn = TopicName.get(topic);
- WebTarget path = adminV3Transactions.path("pendingAckStats");
+ WebTarget path = adminV3Transactions.path("positionStatsInPendingAck");
path = path.path(tn.getRestPath(false));
path = path.path(subName);
path = path.path(ledgerId.toString());
@@ -389,9 +389,9 @@ public class TransactionsImpl extends BaseResource implements Transactions {
@Override
- public PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId,
- Long entryId, Integer batchIndex)
+ public PositionInPendingAckStats getPositionStatsInPendingAck(String topic, String subName, Long ledgerId,
+ Long entryId, Integer batchIndex)
throws PulsarAdminException {
- return sync(() -> checkPositionInPendingAckStateAsync(topic, subName, ledgerId, entryId, batchIndex));
+ return sync(() -> getPositionStatsInPendingAckAsync(topic, subName, ledgerId, entryId, batchIndex));
}
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index e63509854e4..6baa2e7e5e7 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2171,6 +2171,10 @@ public class PulsarAdminToolTest {
cmdTransactions = new CmdTransactions(() -> admin);
cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
verify(transactions).scaleTransactionCoordinators(3);
+
+ cmdTransactions = new CmdTransactions(() -> admin);
+ cmdTransactions.run(split("position-stats-in-pending-ack -t test -s test -l 1 -e 1 -b 1"));
+ verify(transactions).getPositionStatsInPendingAck("test", "test", 1L, 1L, 1);
}
@Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index 9e06f1b2706..a79966035cd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -32,7 +32,7 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get transaction coordinator stats")
private class GetCoordinatorStats extends CliCommand {
- @Parameter(names = {"-c", "--coordinator-id"}, description = "the coordinator id", required = false)
+ @Parameter(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false)
private Integer coordinatorId;
@Override
@@ -47,7 +47,7 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get transaction buffer stats")
private class GetTransactionBufferStats extends CliCommand {
- @Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
+ @Parameter(names = {"-t", "--topic"}, description = "The topic", required = true)
private String topic;
@Parameter(names = {"-l", "--low-water-mark"},
@@ -62,10 +62,10 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get transaction pending ack stats")
private class GetPendingAckStats extends CliCommand {
- @Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
+ @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;
- @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
+ @Parameter(names = {"-s", "--sub-name"}, description = "The subscription name", required = true)
private String subName;
@Parameter(names = {"-l", "--low-water-mark"},
@@ -80,16 +80,16 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get transaction in pending ack stats")
private class GetTransactionInPendingAckStats extends CliCommand {
- @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+ @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
private int mostSigBits;
- @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+ @Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
private long leastSigBits;
- @Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true)
+ @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;
- @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
+ @Parameter(names = {"-s", "--sub-name"}, description = "The subscription name", required = true)
private String subName;
@Override
@@ -102,13 +102,13 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get transaction in buffer stats")
private class GetTransactionInBufferStats extends CliCommand {
- @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+ @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
private int mostSigBits;
- @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+ @Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
private long leastSigBits;
- @Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
+ @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
private String topic;
@Override
@@ -119,10 +119,10 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get transaction metadata")
private class GetTransactionMetadata extends CliCommand {
- @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+ @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
private int mostSigBits;
- @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+ @Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
private long leastSigBits;
@Override
@@ -172,10 +172,10 @@ public class CmdTransactions extends CmdBase {
@Parameters(commandDescription = "Get pending ack internal stats")
private class GetPendingAckInternalStats extends CliCommand {
- @Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true)
+ @Parameter(names = {"-t", "--topic"}, description = "Topic name", required = true)
private String topic;
- @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
+ @Parameter(names = {"-s", "--subscription-name"}, description = "Subscription name", required = true)
private String subName;
@Parameter(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata")
@@ -196,6 +196,29 @@ public class CmdTransactions extends CmdBase {
}
}
+ @Parameters(commandDescription = "Get the position stats in transaction pending ack")
+ private class GetPositionStatsInPendingAck extends CliCommand {
+ @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
+ private String topic;
+
+ @Parameter(names = {"-s", "--subscription-name"}, description = "Subscription name", required = true)
+ private String subName;
+
+ @Parameter(names = {"-l", "--ledger-id"}, description = "Ledger ID of the position", required = true)
+ private Long ledgerId;
+
+ @Parameter(names = {"-e", "--entry-id"}, description = "Entry ID of the position", required = true)
+ private Long entryId;
+
+ @Parameter(names = {"-b", "--batch-index"}, description = "Batch index of the position")
+ private Integer batchIndex;
+
+ @Override
+ void run() throws Exception {
+ getAdmin().transactions().getPositionStatsInPendingAck(topic, subName, ledgerId, entryId, batchIndex);
+ }
+ }
+
public CmdTransactions(Supplier<PulsarAdmin> admin) {
super("transactions", admin);
@@ -209,5 +232,7 @@ public class CmdTransactions extends CmdBase {
jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
jcommander.addCommand("slow-transactions", new GetSlowTransactions());
jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators());
+ jcommander.addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck());
+
}
}