You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xi...@apache.org on 2022/07/05 08:39:46 UTC
[pulsar] branch master updated: [admin][txn] Add transaction admin to get recover time in stats. (#15654)
This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new da325fa6439 [admin][txn] Add transaction admin to get recover time in stats. (#15654)
da325fa6439 is described below
commit da325fa64399642700d35b7d538a92b65c604bf5
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Tue Jul 5 16:39:39 2022 +0800
[admin][txn] Add transaction admin to get recover time in stats. (#15654)
* [admin][txn] Add transaction admin to get recover time in stats.
### Motivation & Modification
Optimize admin tools to get recovery timestamps in TB, TP, TC stats.
---
.../buffer/impl/TopicTransactionBuffer.java | 9 ++++
.../pendingack/impl/PendingAckHandleImpl.java | 12 +++++
.../broker/admin/v3/AdminApiTransactionTest.java | 58 +++++++++++++++++++++-
.../policies/data/TransactionBufferStats.java | 5 ++
.../policies/data/TransactionCoordinatorStats.java | 4 ++
.../policies/data/TransactionPendingAckStats.java | 4 ++
.../pulsar/common/util/RecoverTimeRecord.java | 21 +++-----
.../impl/MLTransactionMetadataStore.java | 7 ++-
8 files changed, 105 insertions(+), 15 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
index 4da82922a22..48aad16340c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java
@@ -59,6 +59,7 @@ import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.Codec;
+import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
@@ -103,6 +104,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
*/
private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>();
+ public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
+
private final Semaphore handleLowWaterMark = new Semaphore(1);
public TopicTransactionBuffer(PersistentTopic topic) {
@@ -120,6 +123,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
}
private void recover() {
+ recoverTime.setRecoverStartTime(System.currentTimeMillis());
this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this)
.execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() {
@Override
@@ -142,6 +146,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
timer.newTimeout(TopicTransactionBuffer.this,
takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
transactionBufferFuture.complete(null);
+ recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
}
@@ -157,6 +162,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
log.error("[{}]Transaction buffer recover fail", topic.getName());
} else {
transactionBufferFuture.complete(null);
+ recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
}
@@ -210,6 +216,7 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
} else {
transactionBufferFuture.completeExceptionally(e);
}
+ recoverTime.setRecoverEndTime(System.currentTimeMillis());
topic.close(true);
}
}, this.topic, this, takeSnapshotWriter));
@@ -565,6 +572,8 @@ public class TopicTransactionBuffer extends TopicTransactionBufferState implemen
}
transactionBufferStats.ongoingTxnSize = ongoingTxns.size();
+ transactionBufferStats.recoverStartTime = recoverTime.getRecoverStartTime();
+ transactionBufferStats.recoverEndTime = recoverTime.getRecoverEndTime();
return transactionBufferStats;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
index ab070d8a942..1a159974700 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java
@@ -57,6 +57,7 @@ import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
@@ -127,6 +128,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
@Getter
private final ExecutorService internalPinnedExecutor;
+ public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
+
public PendingAckHandleImpl(PersistentSubscription persistentSubscription) {
super(State.None);
@@ -157,6 +160,7 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
this.pendingAckStoreFuture =
pendingAckStoreProvider.newPendingAckStore(persistentSubscription);
this.pendingAckStoreFuture.thenAccept(pendingAckStore -> {
+ recoverTime.setRecoverStartTime(System.currentTimeMillis());
pendingAckStore.replayAsync(this, internalPinnedExecutor);
}).exceptionally(e -> {
acceptQueue.clear();
@@ -905,6 +909,8 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
} else {
transactionPendingAckStats.ongoingTxnSize = 0;
}
+ transactionPendingAckStats.recoverStartTime = recoverTime.getRecoverStartTime();
+ transactionPendingAckStats.recoverEndTime = recoverTime.getRecoverEndTime();
return transactionPendingAckStats;
}
@@ -912,11 +918,17 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this);
}
+ if (recoverTime.getRecoverStartTime() == 0L) {
+ return;
+ } else {
+ recoverTime.setRecoverEndTime(System.currentTimeMillis());
+ }
}
public synchronized void exceptionHandleFuture(Throwable t) {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
+ recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
index 620c4607836..ddb340109d7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java
@@ -71,6 +71,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertFalse;
@@ -619,6 +620,61 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testGetRecoveryTime() throws Exception {
+ initTransaction(1);
+ final String topic = "persistent://public/default/testGetRecoveryTime";
+ final String subName = "test";
+
+ @Cleanup
+ Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .subscriptionName(subName)
+ .topic(topic)
+ .subscribe();
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .sendTimeout(0, TimeUnit.SECONDS)
+ .topic(topic)
+ .create();
+
+ Awaitility.await().untilAsserted(() -> {
+ Map<Integer, TransactionCoordinatorStats> transactionCoordinatorStatsMap =
+ admin.transactions().getCoordinatorStats();
+ assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverStartTime, 0L);
+ assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, 0L);
+ assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, -1L);
+ });
+ Awaitility.await().untilAsserted(() -> {
+ TransactionBufferStats transactionBufferStats = admin.transactions().getTransactionBufferStats(topic);
+ assertNotEquals(transactionBufferStats.recoverStartTime, 0L);
+ assertNotEquals(transactionBufferStats.recoverEndTime, 0L);
+ assertNotEquals(transactionBufferStats.recoverEndTime, -1L);
+ });
+
+ TransactionPendingAckStats transactionPendingAckStats =
+ admin.transactions().getPendingAckStats(topic, subName);
+ assertEquals(transactionPendingAckStats.recoverStartTime, 0L);
+ assertEquals(transactionPendingAckStats.recoverEndTime, 0L);
+
+ Transaction transaction1 = pulsarClient.newTransaction()
+ .withTransactionTimeout(5, TimeUnit.MINUTES)
+ .build()
+ .get();
+
+ producer.newMessage().send();
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+
+ consumer.acknowledgeAsync(message.getMessageId(), transaction1);
+ transaction1.commit().get();
+
+ transactionPendingAckStats =
+ admin.transactions().getPendingAckStats(topic, subName);
+ assertNotEquals(transactionPendingAckStats.recoverStartTime, 0L);
+ assertNotEquals(transactionPendingAckStats.recoverEndTime, 0L);
+ assertNotEquals(transactionPendingAckStats.recoverEndTime, -1L);
+ }
+
@Test
public void testCheckPositionInPendingAckState() throws Exception {
@@ -718,7 +774,7 @@ public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 1);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
-
+
positionStatsInPendingAckStats =
admin.transactions().checkPositionInPendingAckState(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 2);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
index 7c31294105a..d1ad6400207 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionBufferStats.java
@@ -39,4 +39,9 @@ public class TransactionBufferStats {
* The total number of ongoing transactions in this transaction buffer.
*/
public long ongoingTxnSize;
+
+ //Start timestamp of transaction buffer recovery. 0L means no startup.
+ public long recoverStartTime;
+ //End timestamp of transaction buffer recovery. 0L means no startup.
+ public long recoverEndTime;
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java
index 9b5f76f7d1e..1764c1c33a8 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionCoordinatorStats.java
@@ -39,4 +39,8 @@ public class TransactionCoordinatorStats {
* The total number of ongoing transactions in this transaction coordinator.
*/
public long ongoingTxnSize;
+ //Start timestamp of transaction coordinator recovery. 0L means no startup.
+ public long recoverStartTime;
+ //End timestamp of transaction coordinator recovery. 0L means no startup.
+ public long recoverEndTime;
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
index 8613ec13bd9..d4480d9e4c1 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
@@ -34,4 +34,8 @@ public class TransactionPendingAckStats {
* The total number of ongoing transactions in this transaction pending ack.
*/
public long ongoingTxnSize;
+ //Start timestamp of transaction pendingAck recovery. 0L means no startup.
+ public long recoverStartTime;
+ //End timestamp of transaction pendingAck recovery. 0L means no startup.
+ public long recoverEndTime;
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RecoverTimeRecord.java
similarity index 64%
copy from pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
copy to pulsar-common/src/main/java/org/apache/pulsar/common/util/RecoverTimeRecord.java
index 8613ec13bd9..160e3fdf37b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/policies/data/TransactionPendingAckStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/RecoverTimeRecord.java
@@ -16,22 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.common.policies.data;
+package org.apache.pulsar.common.util;
-import java.util.Map;
-public class TransactionPendingAckStats {
+import lombok.Getter;
+import lombok.Setter;
- /** The state of this pending ack. */
- public String state;
+@Getter
+@Setter
+public class RecoverTimeRecord {
- /**
- * (Optional) The lowWaterMark details of the transaction pending ack.
- */
- public Map<Long, Long> lowWaterMarks;
+ private long recoverStartTime;
- /**
- * The total number of ongoing transactions in this transaction pending ack.
- */
- public long ongoingTxnSize;
+ private long recoverEndTime;
}
diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
index fa0a6b1102e..a35ba975851 100644
--- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
+++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.RecoverTimeRecord;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionLogReplayCallback;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
@@ -76,6 +77,7 @@ public class MLTransactionMetadataStore
private final LongAdder appendLogCount;
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
private final ExecutorService internalPinnedExecutor;
+ public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
private final long maxActiveTransactionsPerCoordinator;
public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
@@ -110,8 +112,8 @@ public class MLTransactionMetadataStore
.CoordinatorNotFoundException("transaction metadata store with tcId "
+ tcID.toString() + " change state to Initializing error when init it"));
} else {
+ recoverTime.setRecoverStartTime(System.currentTimeMillis());
internalPinnedExecutor.execute(() -> transactionLog.replayAsync(new TransactionLogReplayCallback() {
-
@Override
public void replayComplete() {
recoverTracker.appendOpenTransactionToTimeoutTracker();
@@ -126,6 +128,7 @@ public class MLTransactionMetadataStore
recoverTracker.handleCommittingAndAbortingTransaction();
timeoutTracker.start();
completableFuture.complete(MLTransactionMetadataStore.this);
+ recoverTime.setRecoverEndTime(System.currentTimeMillis());
}
}
@@ -446,6 +449,8 @@ public class MLTransactionMetadataStore
transactionCoordinatorstats.setState(getState().name());
transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
+ transactionCoordinatorstats.recoverStartTime = recoverTime.getRecoverStartTime();
+ transactionCoordinatorstats.recoverEndTime = recoverTime.getRecoverEndTime();
return transactionCoordinatorstats;
}