You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/07 08:03:51 UTC

[GitHub] [pulsar] congbobo184 opened a new pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

congbobo184 opened a new pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162


   ## Motivation
   1. now recover tc use timeout is ```currentTime``` + ```transactionTimeout```, it is not right. it need to use ```startTransactionTime``` + ```transactionTimeout```.
   2. fix lose time out, the original logical will pop the useful transaction from the ```priorityQueue```
   
   ## implement
   fix the logical of the transaction timeout.
   ### Verifying this change
   Add the tests for it
   
   Does this pull request potentially affect one of the following parts:
   If yes was chosen, please highlight the changes
   
   Dependencies (does it add or upgrade a dependency): (no)
   The public API: (no)
   The schema: (no)
   The default values of configurations: (no)
   The wire protocol: (no)
   The rest endpoints: (no)
   The admin cli options: (no)
   Anything that affects deployment: (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#discussion_r615484278



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/timeout/TransactionTimeoutTrackerImpl.java
##########
@@ -40,11 +40,10 @@
     private final TripleLongPriorityQueue priorityQueue = new TripleLongPriorityQueue();
     private final long tickTimeMillis;
     private final Clock clock;
-    private Timeout currentTimeout;
+    private volatile Timeout currentTimeout;

Review comment:
       Do we need `volatile` here? Seem it is protected by the `synchronized`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#discussion_r617760793



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -222,14 +233,58 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(10000);
+                    transactionMetadataStore.newTransaction(4000);
                 } catch (Exception e) {
                     //no operation
                 }
             }
         }).start();
-        Awaitility.await().atLeast(3000, TimeUnit.MICROSECONDS).atMost(10000, TimeUnit.MILLISECONDS)
-                .until(() -> txnMap.size() == 100);
+
+        checkoutTimeout(txnMap, 300);
+        checkoutTimeout(txnMap, 200);
+        checkoutTimeout(txnMap, 100);
+        checkoutTimeout(txnMap, 0);
+    }
+
+    private void checkoutTimeout(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap, int time) {
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == time);
+    }
+
+    @Test
+    public void transactionTimeoutRecoverTest() throws Exception {
+        int timeout = 2000;
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(timeout);
+
+        pulsar.getTransactionMetadataStoreService()
+                .removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+        assertEquals(txnMap.size(), 1);

Review comment:
       @eolivelli Done, #10313




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#discussion_r617664762



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -222,14 +233,58 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(10000);
+                    transactionMetadataStore.newTransaction(4000);
                 } catch (Exception e) {
                     //no operation
                 }
             }
         }).start();
-        Awaitility.await().atLeast(3000, TimeUnit.MICROSECONDS).atMost(10000, TimeUnit.MILLISECONDS)
-                .until(() -> txnMap.size() == 100);
+
+        checkoutTimeout(txnMap, 300);
+        checkoutTimeout(txnMap, 200);
+        checkoutTimeout(txnMap, 100);
+        checkoutTimeout(txnMap, 0);
+    }
+
+    private void checkoutTimeout(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap, int time) {
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == time);
+    }
+
+    @Test
+    public void transactionTimeoutRecoverTest() throws Exception {
+        int timeout = 2000;
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(timeout);
+
+        pulsar.getTransactionMetadataStoreService()
+                .removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+        assertEquals(txnMap.size(), 1);

Review comment:
       @congbobo184  This is flaky, could it be removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#discussion_r617674935



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -222,14 +233,58 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(10000);
+                    transactionMetadataStore.newTransaction(4000);
                 } catch (Exception e) {
                     //no operation
                 }
             }
         }).start();
-        Awaitility.await().atLeast(3000, TimeUnit.MICROSECONDS).atMost(10000, TimeUnit.MILLISECONDS)
-                .until(() -> txnMap.size() == 100);
+
+        checkoutTimeout(txnMap, 300);
+        checkoutTimeout(txnMap, 200);
+        checkoutTimeout(txnMap, 100);
+        checkoutTimeout(txnMap, 0);
+    }
+
+    private void checkoutTimeout(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap, int time) {
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == time);
+    }
+
+    @Test
+    public void transactionTimeoutRecoverTest() throws Exception {
+        int timeout = 2000;
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(timeout);
+
+        pulsar.getTransactionMetadataStoreService()
+                .removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+        assertEquals(txnMap.size(), 1);

Review comment:
       +1 for removing it.
   
   @lhotari  can you please send a PR ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#issuecomment-824159452


   This PR introduced a flaky test #10310 . Please fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#discussion_r617664762



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -222,14 +233,58 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(10000);
+                    transactionMetadataStore.newTransaction(4000);
                 } catch (Exception e) {
                     //no operation
                 }
             }
         }).start();
-        Awaitility.await().atLeast(3000, TimeUnit.MICROSECONDS).atMost(10000, TimeUnit.MILLISECONDS)
-                .until(() -> txnMap.size() == 100);
+
+        checkoutTimeout(txnMap, 300);
+        checkoutTimeout(txnMap, 200);
+        checkoutTimeout(txnMap, 100);
+        checkoutTimeout(txnMap, 0);
+    }
+
+    private void checkoutTimeout(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap, int time) {
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == time);
+    }
+
+    @Test
+    public void transactionTimeoutRecoverTest() throws Exception {
+        int timeout = 2000;
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(timeout);
+
+        pulsar.getTransactionMetadataStoreService()
+                .removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+        assertEquals(txnMap.size(), 1);

Review comment:
       @congbobo184  This assertion `assertEquals(txnMap.size(), 1);` is flaky (#10310), could it be removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#discussion_r617664762



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java
##########
@@ -222,14 +233,58 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
             int i = -1;
             while (++i < 100) {
                 try {
-                    transactionMetadataStore.newTransaction(10000);
+                    transactionMetadataStore.newTransaction(4000);
                 } catch (Exception e) {
                     //no operation
                 }
             }
         }).start();
-        Awaitility.await().atLeast(3000, TimeUnit.MICROSECONDS).atMost(10000, TimeUnit.MILLISECONDS)
-                .until(() -> txnMap.size() == 100);
+
+        checkoutTimeout(txnMap, 300);
+        checkoutTimeout(txnMap, 200);
+        checkoutTimeout(txnMap, 100);
+        checkoutTimeout(txnMap, 0);
+    }
+
+    private void checkoutTimeout(ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap, int time) {
+        Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> txnMap.size() == time);
+    }
+
+    @Test
+    public void transactionTimeoutRecoverTest() throws Exception {
+        int timeout = 2000;
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        MLTransactionMetadataStore transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        transactionMetadataStore.newTransaction(timeout);
+
+        pulsar.getTransactionMetadataStoreService()
+                .removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
+
+        pulsar.getTransactionMetadataStoreService().addTransactionMetadataStore(TransactionCoordinatorID.get(0));
+        Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS)
+                .until(() -> pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0)) != null);
+        transactionMetadataStore =
+                (MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
+                        .getStores().get(TransactionCoordinatorID.get(0));
+
+        checkTransactionMetadataStoreReady(transactionMetadataStore);
+
+        Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
+        field.setAccessible(true);
+        ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>> txnMap =
+                (ConcurrentMap<TxnID, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
+        assertEquals(txnMap.size(), 1);

Review comment:
       @congbobo184  This is flaky (#10310), could this line be removed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10162: [Transaction] Fix transactionMetadata store recover timeout problem.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10162:
URL: https://github.com/apache/pulsar/pull/10162#issuecomment-815388225


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org