You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/07/05 08:39:46 UTC

[pulsar] branch master updated: [admin][txn] Add transaction admin to get recover time in stats. (#15654)

This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da325fa6439 [admin][txn] Add transaction admin to get recover time in stats. (#15654)
da325fa6439 is described below

commit da325fa64399642700d35b7d538a92b65c604bf5
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Jul 5 16:39:39 2022 +0800

    [admin][txn] Add transaction admin to get recover time in stats. (#15654)
    
    * [admin][txn] Add transaction admin to get recover time in stats.
    ### Motivation & Modification
    Optimize admin tools to get recovery timestamps in TB, TP, TC stats.
---
 .../buffer/impl/TopicTransactionBuffer.java        |  9 ++++
 .../pendingack/impl/PendingAckHandleImpl.java      | 12 +++++
 .../broker/admin/v3/AdminApiTransactionTest.java   | 58 +++++++++++++++++++++-
 .../policies/data/TransactionBufferStats.java      |  5 ++
 .../policies/data/TransactionCoordinatorStats.java |  4 ++
 .../policies/data/TransactionPendingAckStats.java  |  4 ++
 .../pulsar/common/util/RecoverTimeRecord.java      | 21 +++-----
 .../impl/MLTransactionMetadataStore.java           |  7 ++-
 8 files changed, 105 insertions(+), 15 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 4da82922a22..48aad16340c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.RecoverTimeRecord;
 import org.jctools.queues.MessagePassingQueue;
 import org.jctools.queues.SpscArrayQueue;
 
@@ -103,6 +104,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
      */
     private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
 
+    public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
+
     private final Semaphore handleLowWaterMark = new Semaphore(1);
 
     public TopicTransactionBuffer(PersistentTopic topic) {
@@ -120,6 +123,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
     }
 
     private void recover() {
+        recoverTime.setRecoverStartTime(System.currentTimeMillis());
         this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)
                 .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
                     @Override
@@ -142,6 +146,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                 timer.newTimeout(TopicTransactionBuffer.this,
                                         takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
                                 transactionBufferFuture.complete(null);
+                                recoverTime.setRecoverEndTime(System.currentTimeMillis());
                             }
                         }
                     }
@@ -157,6 +162,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                                 log.error("[{}]Transaction buffer recover fail", topic.getName());
                             } else {
                                 transactionBufferFuture.complete(null);
+                                recoverTime.setRecoverEndTime(System.currentTimeMillis());
                             }
                         }
                     }
@@ -210,6 +216,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
                         } else {
                             transactionBufferFuture.completeExceptionally(e);
                         }
+                        recoverTime.setRecoverEndTime(System.currentTimeMillis());
                         topic.close(true);
                     }
                 }, this.topic, this, takeSnapshotWriter));
@@ -565,6 +572,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
         }
         transactionBufferStats.ongoingTxnSize = ongoingTxns.size();
 
+        transactionBufferStats.recoverStartTime = recoverTime.getRecoverStartTime();
+        transactionBufferStats.recoverEndTime = recoverTime.getRecoverEndTime();
         return transactionBufferStats;
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index ab070d8a942..1a159974700 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
 import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
 import org.apache.pulsar.common.stats.PositionInPendingAckStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RecoverTimeRecord;
 import org.apache.pulsar.common.util.collections.BitSetRecyclable;
 import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
 
@@ -127,6 +128,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
     @Getter
     private final ExecutorService internalPinnedExecutor;
 
+    public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
+
 
     public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
         super(State.None);
@@ -157,6 +160,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
                 this.pendingAckStoreFuture =
                         pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
                 this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+                    recoverTime.setRecoverStartTime(System.currentTimeMillis());
                     pendingAckStore.replayAsync(this, internalPinnedExecutor);
                 }).exceptionally(e -> {
                     acceptQueue.clear();
@@ -905,6 +909,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         } else {
             transactionPendingAckStats.ongoingTxnSize = 0;
         }
+        transactionPendingAckStats.recoverStartTime = recoverTime.getRecoverStartTime();
+        transactionPendingAckStats.recoverEndTime = recoverTime.getRecoverEndTime();
         return transactionPendingAckStats;
     }
 
@@ -912,11 +918,17 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
         if (!this.pendingAckHandleCompletableFuture.isDone()) {
             this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
         }
+        if (recoverTime.getRecoverStartTime() == 0L) {
+            return;
+        } else {
+            recoverTime.setRecoverEndTime(System.currentTimeMillis());
+        }
     }
 
     public synchronized void exceptionHandleFuture(Throwable t) {
         if (!this.pendingAckHandleCompletableFuture.isDone()) {
             this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+            recoverTime.setRecoverEndTime(System.currentTimeMillis());
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 620c4607836..ddb340109d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -71,6 +71,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertFalse;
@@ -619,6 +620,61 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testGetRecoveryTime() throws Exception {
+        initTransaction(1);
+        final String topic = "persistent://public/default/testGetRecoveryTime";
+        final String subName = "test";
+
+        @Cleanup
+        Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                .subscriptionName(subName)
+                .topic(topic)
+                .subscribe();
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .sendTimeout(0, TimeUnit.SECONDS)
+                .topic(topic)
+                .create();
+
+        Awaitility.await().untilAsserted(() -> {
+            Map<Integer, TransactionCoordinatorStats> transactionCoordinatorStatsMap =
+                    admin.transactions().getCoordinatorStats();
+            assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverStartTime, 0L);
+            assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, 0L);
+            assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, -1L);
+        });
+        Awaitility.await().untilAsserted(() -> {
+            TransactionBufferStats transactionBufferStats = admin.transactions().getTransactionBufferStats(topic);
+            assertNotEquals(transactionBufferStats.recoverStartTime, 0L);
+            assertNotEquals(transactionBufferStats.recoverEndTime, 0L);
+            assertNotEquals(transactionBufferStats.recoverEndTime, -1L);
+        });
+
+        TransactionPendingAckStats transactionPendingAckStats =
+                admin.transactions().getPendingAckStats(topic, subName);
+        assertEquals(transactionPendingAckStats.recoverStartTime, 0L);
+        assertEquals(transactionPendingAckStats.recoverEndTime, 0L);
+
+        Transaction transaction1 = pulsarClient.newTransaction()
+                .withTransactionTimeout(5, TimeUnit.MINUTES)
+                .build()
+                .get();
+
+        producer.newMessage().send();
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+
+        consumer.acknowledgeAsync(message.getMessageId(), transaction1);
+        transaction1.commit().get();
+
+        transactionPendingAckStats =
+                admin.transactions().getPendingAckStats(topic, subName);
+        assertNotEquals(transactionPendingAckStats.recoverStartTime, 0L);
+        assertNotEquals(transactionPendingAckStats.recoverEndTime, 0L);
+        assertNotEquals(transactionPendingAckStats.recoverEndTime, -1L);
+    }
+
 
     @Test
     public void testCheckPositionInPendingAckState() throws Exception {
@@ -718,7 +774,7 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
                 admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
                 messageId.getLedgerId(), messageId.getEntryId(), 1);
         assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
-        
+
         positionStatsInPendingAckStats =
                 admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
                         messageId.getLedgerId(), messageId.getEntryId(), 2);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
index 7c31294105a..d1ad6400207 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
@@ -39,4 +39,9 @@ public class TransactionBufferStats {
      * The total number of ongoing transactions in this transaction buffer.
      */
     public long ongoingTxnSize;
+
+    //Start timestamp of  transaction buffer recovery. 0L means no startup.
+    public long recoverStartTime;
+    //End timestamp of transaction buffer recovery. 0L means no startup.
+    public long recoverEndTime;
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java
index 9b5f76f7d1e..1764c1c33a8 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java
@@ -39,4 +39,8 @@ public class TransactionCoordinatorStats {
      *  The total number of ongoing transactions in this transaction coordinator.
      */
     public long ongoingTxnSize;
+    //Start timestamp of  transaction coordinator recovery. 0L means no startup.
+    public long recoverStartTime;
+    //End timestamp of transaction coordinator recovery. 0L means no startup.
+    public long recoverEndTime;
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
index 8613ec13bd9..d4480d9e4c1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
@@ -34,4 +34,8 @@ public class TransactionPendingAckStats {
      * The total number of ongoing transactions in this transaction pending ack.
      */
     public long ongoingTxnSize;
+    //Start timestamp of  transaction pendingAck recovery. 0L means no startup.
+    public long recoverStartTime;
+    //End timestamp of transaction pendingAck recovery. 0L means no startup.
+    public long recoverEndTime;
 }
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RecoverTimeRecord.java
similarity index 64%
copy from pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/RecoverTimeRecord.java
index 8613ec13bd9..160e3fdf37b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RecoverTimeRecord.java
@@ -16,22 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.common.policies.data;
+package org.apache.pulsar.common.util;
 
-import java.util.Map;
 
-public class TransactionPendingAckStats {
+import lombok.Getter;
+import lombok.Setter;
 
-    /** The state of this pending ack. */
-    public String state;
+@Getter
+@Setter
+public class RecoverTimeRecord {
 
-    /**
-     * (Optional) The lowWaterMark details of the transaction pending ack.
-     */
-    public Map<Long, Long> lowWaterMarks;
+    private long recoverStartTime;
 
-    /**
-     * The total number of ongoing transactions in this transaction pending ack.
-     */
-    public long ongoingTxnSize;
+    private long recoverEndTime;
 }
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index fa0a6b1102e..a35ba975851 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
 import org.apache.pulsar.common.api.proto.Subscription;
 import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RecoverTimeRecord;
 import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
 import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
 import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -76,6 +77,7 @@ public class MLTransactionMetadataStore
     private final LongAdder appendLogCount;
     private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
     private final ExecutorService internalPinnedExecutor;
+    public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
     private final long maxActiveTransactionsPerCoordinator;
 
     public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
@@ -110,8 +112,8 @@ public class MLTransactionMetadataStore
                     .CoordinatorNotFoundException("transaction metadata store with tcId "
                             + tcID.toString() + " change state to Initializing error when init it"));
         } else {
+            recoverTime.setRecoverStartTime(System.currentTimeMillis());
             internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
                 @Override
                 public void replayComplete() {
                     recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -126,6 +128,7 @@ public class MLTransactionMetadataStore
                         recoverTracker.handleCommittingAndAbortingTransaction();
                         timeoutTracker.start();
                         completableFuture.complete(MLTransactionMetadataStore.this);
+                        recoverTime.setRecoverEndTime(System.currentTimeMillis());
                     }
                 }
 
@@ -446,6 +449,8 @@ public class MLTransactionMetadataStore
         transactionCoordinatorstats.setState(getState().name());
         transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
         transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
+        transactionCoordinatorstats.recoverStartTime = recoverTime.getRecoverStartTime();
+        transactionCoordinatorstats.recoverEndTime = recoverTime.getRecoverEndTime();
         return transactionCoordinatorstats;
     }