You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/07/07 08:45:45 UTC

[pulsar] branch master updated: [Improve] [txn] add txn admin cmd and swagger config for transaction admin (#16396)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 318432ed1f1 [Improve] [txn] add txn admin cmd and swagger config for transaction admin (#16396)
318432ed1f1 is described below

commit 318432ed1f16e60a088f3dca1b14891d582726a9
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Jul 7 16:45:37 2022 +0800

    [Improve] [txn] add txn admin cmd and swagger config for transaction admin (#16396)
    
    ### Motivation &  Modifications
    1. Added missing admin cmd
    2. Add swagger config for transaction admin
    3. optimize description and URL
    Due to the [PR](https://github.com/apache/pulsar/pull/15682) not being cherry-picked, there are no break changes.
---
 pulsar-broker/pom.xml                              | 17 +++++++
 .../pulsar/broker/admin/v3/Transactions.java       |  2 +-
 .../broker/admin/v3/AdminApiTransactionTest.java   | 12 ++---
 .../apache/pulsar/client/admin/Transactions.java   | 15 +++---
 .../client/admin/internal/TransactionsImpl.java    | 18 +++----
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 ++
 .../apache/pulsar/admin/cli/CmdTransactions.java   | 55 ++++++++++++++++------
 7 files changed, 84 insertions(+), 39 deletions(-)

diff --git a/pulsar-broker/pom.xml b/pulsar-broker/pom.xml
index 952c10301f3..d2641319e85 100644
--- a/pulsar-broker/pom.xml
+++ b/pulsar-broker/pom.xml
@@ -708,6 +708,23 @@
                   <swaggerDirectory>${basedir}/target/docs</swaggerDirectory>
                   <swaggerFileName>swaggerfunctions</swaggerFileName>
                 </apiSource>
+                <apiSource>
+                  <springmvc>false</springmvc>
+                  <locations>org.apache.pulsar.broker.admin.v3.Transactions</locations>
+                  <schemes>http,https</schemes>
+                  <basePath>/admin/v3</basePath>
+                  <info>
+                    <title>Pulsar Transactions REST API</title>
+                    <version>v3</version>
+                    <description>This provides the REST API for Pulsar Transactions operations</description>
+                    <license>
+                      <url>http://www.apache.org/licenses/LICENSE-2.0.html</url>
+                      <name>Apache 2.0</name>
+                    </license>
+                  </info>
+                  <swaggerDirectory>${basedir}/target/docs</swaggerDirectory>
+                  <swaggerFileName>swaggertransactions</swaggerFileName>
+                </apiSource>
                 <apiSource>
                   <springmvc>false</springmvc>
                   <locations>org.apache.pulsar.broker.admin.v3.Sources</locations>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
index bffdf5a252c..5dcf57f80f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Transactions.java
@@ -345,7 +345,7 @@ public class Transactions extends TransactionsBase {
     }
 
     @GET
-    @Path("/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
+    @Path("/positionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
     @ApiOperation(value = "Get position stats in pending ack.")
     @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic "
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index ddb340109d7..0e9bbc10251 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -703,16 +703,16 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
          Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
          MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
 
-         PositionInPendingAckStats result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+         PositionInPendingAckStats result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
                  messageId.getLedgerId(), messageId.getEntryId(), null);
          assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);
 
          consumer.acknowledgeAsync(messageId, transaction).get();
-         result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+         result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
                 messageId.getLedgerId(), messageId.getEntryId(), null);
          assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
          transaction.commit().get();
-         result = admin.transactions().checkPositionInPendingAckState(topic, subName,
+         result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
                  messageId.getLedgerId(), messageId.getEntryId(), null);
          assertEquals(result.state, PositionInPendingAckStats.State.MarkDelete);
     }
@@ -771,16 +771,16 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         consumer.acknowledgeAsync(messageId, transaction).get();
 
         PositionInPendingAckStats positionStatsInPendingAckStats =
-                admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+                admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
                 messageId.getLedgerId(), messageId.getEntryId(), 1);
         assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
 
         positionStatsInPendingAckStats =
-                admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+                admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
                         messageId.getLedgerId(), messageId.getEntryId(), 2);
         assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.NotInPendingAck);
         positionStatsInPendingAckStats =
-                admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
+                admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
                         messageId.getLedgerId(), messageId.getEntryId(), 10);
         assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.InvalidPosition);
 
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
index 2ad7ae0b10b..3bd5a188dce 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
@@ -308,8 +308,7 @@ public interface Transactions {
     CompletableFuture<Void> scaleTransactionCoordinatorsAsync(int replicas);
 
     /**
-     * Check whether the position is in pending ack stats.
-     *
+     * Get the position stats in transaction pending ack.
      * @param topic the topic of checking position in pending ack state
      * @param subName the subscription name of this pending ack
      * @param ledgerId the ledger id of the message position.
@@ -317,11 +316,11 @@ public interface Transactions {
      * @param batchIndex the batch index of the message position, `null` means not batch message.
      * @return {@link PositionInPendingAckStats} a state identified whether the position state.
      */
-    PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId, Long entryId,
-                                                             Integer batchIndex) throws PulsarAdminException;
+    PositionInPendingAckStats getPositionStatsInPendingAck(String topic, String subName, Long ledgerId, Long entryId,
+                                                           Integer batchIndex) throws PulsarAdminException;
 
     /**
-     * Check whether the position is in pending ack stats.
+     * Get the position stats in transaction pending ack.
      *
      * @param topic the topic of checking position in pending ack state
      * @param subName the subscription name of this pending ack
@@ -330,7 +329,7 @@ public interface Transactions {
      * @param batchIndex the batch index of the message position, `null` means not batch message.
      * @return {@link PositionInPendingAckStats} a state identified whether the position state.
      */
-    CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic, String subName,
-                                                                                     Long ledgerId, Long entryId,
-                                                                                     Integer batchIndex);
+    CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(String topic, String subName,
+                                                                                   Long ledgerId, Long entryId,
+                                                                                   Integer batchIndex);
 }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
index edf9381123d..80f41c3e2d5 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TransactionsImpl.java
@@ -359,13 +359,13 @@ public class TransactionsImpl extends BaseResource implements Transactions {
     }
 
     @Override
-    public CompletableFuture<PositionInPendingAckStats> checkPositionInPendingAckStateAsync(String topic,
-                                                                                            String subName,
-                                                                                            Long ledgerId,
-                                                                                            Long entryId,
-                                                                                            Integer batchIndex) {
+    public CompletableFuture<PositionInPendingAckStats> getPositionStatsInPendingAckAsync(String topic,
+                                                                                          String subName,
+                                                                                          Long ledgerId,
+                                                                                          Long entryId,
+                                                                                          Integer batchIndex) {
         TopicName tn = TopicName.get(topic);
-        WebTarget path = adminV3Transactions.path("pendingAckStats");
+        WebTarget path = adminV3Transactions.path("positionStatsInPendingAck");
         path = path.path(tn.getRestPath(false));
         path = path.path(subName);
         path = path.path(ledgerId.toString());
@@ -389,9 +389,9 @@ public class TransactionsImpl extends BaseResource implements Transactions {
 
 
     @Override
-    public PositionInPendingAckStats checkPositionInPendingAckState(String topic, String subName, Long ledgerId,
-                                                                    Long entryId, Integer batchIndex)
+    public PositionInPendingAckStats getPositionStatsInPendingAck(String topic, String subName, Long ledgerId,
+                                                                  Long entryId, Integer batchIndex)
             throws PulsarAdminException {
-        return sync(() -> checkPositionInPendingAckStateAsync(topic, subName, ledgerId, entryId, batchIndex));
+        return sync(() -> getPositionStatsInPendingAckAsync(topic, subName, ledgerId, entryId, batchIndex));
     }
 }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index e63509854e4..6baa2e7e5e7 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -2171,6 +2171,10 @@ public class PulsarAdminToolTest {
         cmdTransactions = new CmdTransactions(() -> admin);
         cmdTransactions.run(split("scale-transactionCoordinators -r 3"));
         verify(transactions).scaleTransactionCoordinators(3);
+
+        cmdTransactions = new CmdTransactions(() -> admin);
+        cmdTransactions.run(split("position-stats-in-pending-ack -t test -s test -l 1 -e 1 -b 1"));
+        verify(transactions).getPositionStatsInPendingAck("test", "test", 1L, 1L, 1);
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
index 9e06f1b2706..a79966035cd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
@@ -32,7 +32,7 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get transaction coordinator stats")
     private class GetCoordinatorStats extends CliCommand {
-        @Parameter(names = {"-c", "--coordinator-id"}, description = "the coordinator id", required = false)
+        @Parameter(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false)
         private Integer coordinatorId;
 
         @Override
@@ -47,7 +47,7 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get transaction buffer stats")
     private class GetTransactionBufferStats extends CliCommand {
-        @Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
+        @Parameter(names = {"-t", "--topic"}, description = "The topic", required = true)
         private String topic;
 
         @Parameter(names = {"-l", "--low-water-mark"},
@@ -62,10 +62,10 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get transaction pending ack stats")
     private class GetPendingAckStats extends CliCommand {
-        @Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
+        @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
         private String topic;
 
-        @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
+        @Parameter(names = {"-s", "--sub-name"}, description = "The subscription name", required = true)
         private String subName;
 
         @Parameter(names = {"-l", "--low-water-mark"},
@@ -80,16 +80,16 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get transaction in pending ack stats")
     private class GetTransactionInPendingAckStats extends CliCommand {
-        @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+        @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
         private int mostSigBits;
 
-        @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+        @Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
         private long leastSigBits;
 
-        @Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true)
+        @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
         private String topic;
 
-        @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
+        @Parameter(names = {"-s", "--sub-name"}, description = "The subscription name", required = true)
         private String subName;
 
         @Override
@@ -102,13 +102,13 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get transaction in buffer stats")
     private class GetTransactionInBufferStats extends CliCommand {
-        @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+        @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
         private int mostSigBits;
 
-        @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+        @Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
         private long leastSigBits;
 
-        @Parameter(names = {"-t", "--topic"}, description = "the topic", required = true)
+        @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
         private String topic;
 
         @Override
@@ -119,10 +119,10 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get transaction metadata")
     private class GetTransactionMetadata extends CliCommand {
-        @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+        @Parameter(names = {"-m", "--most-sig-bits"}, description = "The most sig bits", required = true)
         private int mostSigBits;
 
-        @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+        @Parameter(names = {"-l", "--least-sig-bits"}, description = "The least sig bits", required = true)
         private long leastSigBits;
 
         @Override
@@ -172,10 +172,10 @@ public class CmdTransactions extends CmdBase {
 
     @Parameters(commandDescription = "Get pending ack internal stats")
     private class GetPendingAckInternalStats extends CliCommand {
-        @Parameter(names = {"-t", "--topic"}, description = "the topic name", required = true)
+        @Parameter(names = {"-t", "--topic"}, description = "Topic name", required = true)
         private String topic;
 
-        @Parameter(names = {"-s", "--sub-name"}, description = "the subscription name", required = true)
+        @Parameter(names = {"-s", "--subscription-name"}, description = "Subscription name", required = true)
         private String subName;
 
         @Parameter(names = { "-m", "--metadata" }, description = "Flag to include ledger metadata")
@@ -196,6 +196,29 @@ public class CmdTransactions extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Get the position stats in transaction pending ack")
+    private class GetPositionStatsInPendingAck extends CliCommand {
+        @Parameter(names = {"-t", "--topic"}, description = "The topic name", required = true)
+        private String topic;
+
+        @Parameter(names = {"-s", "--subscription-name"}, description = "Subscription name", required = true)
+        private String subName;
+
+        @Parameter(names = {"-l", "--ledger-id"}, description = "Ledger ID of the position", required = true)
+        private Long ledgerId;
+
+        @Parameter(names = {"-e", "--entry-id"}, description = "Entry ID of the position", required = true)
+        private Long entryId;
+
+        @Parameter(names = {"-b", "--batch-index"}, description = "Batch index of the position")
+        private Integer batchIndex;
+
+        @Override
+        void run() throws Exception {
+            getAdmin().transactions().getPositionStatsInPendingAck(topic, subName, ledgerId, entryId, batchIndex);
+        }
+    }
+
 
     public CmdTransactions(Supplier<PulsarAdmin> admin) {
         super("transactions", admin);
@@ -209,5 +232,7 @@ public class CmdTransactions extends CmdBase {
         jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
         jcommander.addCommand("slow-transactions", new GetSlowTransactions());
         jcommander.addCommand("scale-transactionCoordinators", new ScaleTransactionCoordinators());
+        jcommander.addCommand("position-stats-in-pending-ack", new GetPositionStatsInPendingAck());
+
     }
 }