You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/26 07:52:40 UTC

[GitHub] [pulsar] codelipenghui commented on a change in pull request #10701: [Transaction] Transaction admin api get slow transaction metadata.

codelipenghui commented on a change in pull request #10701:
URL: https://github.com/apache/pulsar/pull/10701#discussion_r639473305



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
##########
@@ -95,11 +97,34 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Get slow transaction metadata")
+    private class GetSlowTransactionMetadata extends CliCommand {

Review comment:
       ```suggestion
       private class GetSlowTransactions extends CliCommand {
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
##########
@@ -286,4 +288,90 @@ protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalGetSlowTransactionsMetadata(AsyncResponse asyncResponse,
+                                                       boolean authoritative, long timeout, Integer coordinatorId) {
+        try {
+            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+                if (coordinatorId != null) {
+                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                            authoritative);
+                    List<Long> transactions = pulsar().getTransactionMetadataStoreService().getStores()

Review comment:
       The metadata store might be null.

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTransactions.java
##########
@@ -95,11 +97,34 @@ void run() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Get slow transaction metadata")
+    private class GetSlowTransactionMetadata extends CliCommand {
+        @Parameter(names = {"-c", "--coordinator-id"}, description = "The coordinator id", required = false)
+        private Integer coordinatorId;
+
+        @Parameter(names = { "-t", "--time" }, description = "The transaction timeout time. "
+                + "(eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+        private String timeoutStr = "1s";
+
+        @Override
+        void run() throws Exception {
+            long timeout =
+                    TimeUnit.SECONDS.toMillis(RelativeTimeUtil.parseRelativeTimeInSeconds(timeoutStr));
+            if (coordinatorId != null) {
+                print(getAdmin().transactions().getSlowTransactionMetadataByCoordinatorId(coordinatorId,
+                        timeout, TimeUnit.MILLISECONDS));
+            } else {
+                print(getAdmin().transactions().getSlowTransactionMetadata(timeout, TimeUnit.MILLISECONDS));
+            }
+        }
+    }
+
     public CmdTransactions(Supplier<PulsarAdmin> admin) {
         super("transactions", admin);
         jcommander.addCommand("coordinator-status", new GetCoordinatorStatus());
         jcommander.addCommand("transaction-in-buffer-stats", new GetTransactionInBufferStats());
         jcommander.addCommand("transaction-in-pending-ack-stats", new GetTransactionInPendingAckStats());
         jcommander.addCommand("transaction-metadata", new GetTransactionMetadata());
+        jcommander.addCommand("slow-transaction-metadata", new GetSlowTransactionMetadata());

Review comment:
       ```suggestion
           jcommander.addCommand("slow-transactions", new GetSlowTransactionMetadata());
   ```

##########
File path: pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
##########
@@ -419,6 +419,17 @@ public TransactionMetadataStoreStats getStats() {
         return transactionMetadataStoreStats;
     }
 
+    @Override
+    public List<Long> getSlowTransactions(long timeout) {

Review comment:
       Why not return the List<TxnMeta> directly?

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
##########
@@ -71,4 +72,26 @@
      */
     CompletableFuture<TransactionMetadata> getTransactionMetadata(TxnID txnID);
 
+    /**
+     * Get slow transaction metadata by coordinatorId.
+     *
+     * @param coordinatorId the coordinator id of getting slow transaction status.
+     * @param timeout the timeout
+     * @param timeUnit the timeout timeUnit
+     * @return the future metadata of slow transactions.
+     */
+    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionMetadataByCoordinatorId(Integer coordinatorId,

Review comment:
       ```suggestion
       CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(Integer coordinatorId,
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
##########
@@ -286,4 +288,90 @@ protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalGetSlowTransactionsMetadata(AsyncResponse asyncResponse,
+                                                       boolean authoritative, long timeout, Integer coordinatorId) {
+        try {
+            if (pulsar().getConfig().isTransactionCoordinatorEnabled()) {
+                if (coordinatorId != null) {
+                    validateTopicOwnership(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
+                            authoritative);
+                    List<Long> transactions = pulsar().getTransactionMetadataStoreService().getStores()
+                            .get(TransactionCoordinatorID.get(coordinatorId)).getSlowTransactions(timeout);
+                    List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
+                    for (Long transaction : transactions) {
+                        completableFutures.add(pulsar().getAdminClient().transactions()
+                                .getTransactionMetadata(new TxnID(coordinatorId, transaction)));

Review comment:
       Why need to use admin to get the transaction metadata?

##########
File path: pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Transactions.java
##########
@@ -71,4 +72,26 @@
      */
     CompletableFuture<TransactionMetadata> getTransactionMetadata(TxnID txnID);
 
+    /**
+     * Get slow transaction metadata by coordinatorId.
+     *
+     * @param coordinatorId the coordinator id of getting slow transaction status.
+     * @param timeout the timeout
+     * @param timeUnit the timeout timeUnit
+     * @return the future metadata of slow transactions.
+     */
+    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionMetadataByCoordinatorId(Integer coordinatorId,
+                                                                                                  long timeout,
+                                                                                                  TimeUnit timeUnit);
+
+    /**
+     * Get slow transaction metadata.
+     *
+     * @param timeout the timeout
+     * @param timeUnit the timeout timeUnit
+     *
+     * @return the future metadata of slow transactions.
+     */
+    CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactionMetadata(long timeout, TimeUnit timeUnit);

Review comment:
       ```suggestion
       CompletableFuture<Map<String, TransactionMetadata>> getSlowTransactions(long timeout, TimeUnit timeUnit);
   ```

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/TransactionsBase.java
##########
@@ -286,4 +288,90 @@ protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
             }
         }
     }
+
+    protected void internalGetSlowTransactionsMetadata(AsyncResponse asyncResponse,

Review comment:
       ```suggestion
       protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org