You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/24 15:40:19 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #10690: [Transaction] Transaction admin api get transaction status

codelipenghui commented on a change in pull request #10690:
URL: https://github.com/apache/pulsar/pull/10690#discussion_r638061278



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
##########
@@ -81,10 +81,25 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Get transaction status")
+    private class GetTransactionStatus extends CliCommand {
+        @Parameter(names = {"-m", "--most-sig-bits"}, description = "the most sig bits", required = true)
+        private int mostSigBits;
+
+        @Parameter(names = {"-l", "--least-sig-bits"}, description = "the least sig bits", required = true)
+        private long leastSigBits;
+
+        @Override
+        void run() throws Exception {
+            print(getAdmin().transactions().getTransactionStatus(new TxnID(mostSigBits, leastSigBits)));
+        }
+    }
+
     public CmdTransactions(Supplier<PulsarAdmin> admin) {
         super("transactions", admin);
         jcommander.addCommand("coordinator-status", new GetCoordinatorStatus());
         jcommander.addCommand("transaction-in-buffer-stats", new GetTransactionInBufferStats());
         jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats());
+        jcommander.addCommand("transaction-status", new GetTransactionStatus());

Review comment:
       It's better to use `transaction-metadata` here since we are using a transaction ID to fetch all transaction metadata through this command.

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/transaction/TransactionImpl.java
##########
@@ -117,7 +117,7 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
         return checkIfOpen().thenCompose(value -> {
             synchronized (TransactionImpl.this) {
                 // we need to issue the request to TC to register the acked topic
-                return registerSubscriptionMap.compute(topic, (key, future) -> {
+                return registerSubscriptionMap.compute(topic + "-" + subscription, (key, future) -> {

Review comment:
       Seems not related to this PR?

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
##########
@@ -172,4 +177,100 @@ protected void internalGetTransactionInBufferStats(AsyncResponse asyncResponse,
                     "This Broker is not configured with transactionCoordinatorEnabled=true."));
         }
     }
+
+    protected void internalGetTransactionStatus(AsyncResponse asyncResponse,
+                                                boolean authoritative, int coordinatorId, long sequenceID) {
+        try {
+            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+                validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                        authoritative);
+                Transactions transactions = pulsar().getAdminClient().transactions();
+                TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
+                        .getTxnMeta(new TxnID(coordinatorId, sequenceID)).get();
+                TxnID txnID = txnMeta.id();

Review comment:
       Will introduce NPE here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org