You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/25 16:56:17 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

congbobo184 opened a new pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366


   ## Motivation
   now transaction timeout tracker in expired state, it will not cancel successfully. so should add the timeout expired check
   
   ## implement
   add the timeout expired check
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r619865952



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)

Review comment:
       Please remove 'atMost' as @lhotari recently suggested

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+

Review comment:
       Don't we use Powermock Whitebox.getInternalState?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);

Review comment:
       This kind of check is probably going to become a new flaky test, because we want that something happens exactly in a given time range, but in case of long GC pause (or other unpredictable slowdown) we are going to fail the test.
   
   Is there another way to implement this check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r619918617



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);

Review comment:
       Regarding the timeout check, I think it can only be achieved by waiting




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r619919068



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+

Review comment:
       We have not exposed the interface to call all transaction metadata information.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r620937689



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+

Review comment:
       I meant to use Whitebox.getInternalState that is a useful utility in order to not using directly reflection, that it is something that breaks from one JDK version to the next one.
   
   not a problem for me, just a suggestion




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r619918617



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);

Review comment:
       Regarding the timeout check, I think it can only be achieved by waiting. any good idea do you have?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r619865952



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)

Review comment:
       Please remove 'atMost' as @lhotari recently suggested

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+

Review comment:
       Don't we use Powermock Whitebox.getInternalState?

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);

Review comment:
       This kind of check is probably going to become a new flaky test, because we want that something happens exactly in a given time range, but in case of long GC pause (or other unpredictable slowdown) we are going to fail the test.
   
   Is there another way to implement this check?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] congbobo184 commented on a change in pull request #10366: [Transaction] Fix transaction timeout tracker expired problem.

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on a change in pull request #10366:
URL: https://github.com/apache/pulsar/pull/10366#discussion_r619918617



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);

Review comment:
       Regarding the timeout check, I think it can only be achieved by waiting

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+

Review comment:
       We have not exposed the interface to call all transaction metadata information.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -181,6 +181,36 @@ public void testTimeoutTracker() throws Exception {
                 .until(() -> txnMap.size() == 0);
     }
 
+    @Test
+    public void testTimeoutTrackerExpired() throws Exception {
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);
+
+        transactionMetadataStore.newTransaction(2000).get();
+
+        txnMap.forEach((txnID, txnMetaListPair) ->
+                Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(3000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == 0);

Review comment:
       Regarding the timeout check, I think it can only be achieved by waiting. any good idea do you have?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org