You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/21 17:26:25 UTC

[GitHub] [pulsar] poorbarcode opened a new pull request, #16727: [improve][txn] PIP-160: Transaction buffered writer supports both ScheduledService and Timer

poorbarcode opened a new pull request, #16727:
URL: https://github.com/apache/pulsar/pull/16727

   Master Issue: #15370
   
   ### Motivation
   
   see #15370
   
   ### Modifications
   
   In the original design of the transaction buffered writer, we use a scheduled executor to execute timed tasks. There are no scheduled executors that match this scenario, and It's a little expensive to have a new scheduled executor with 1 millis tick time, we should reuse the `PulsarService.brokerClientSharedTimer` instead to create a new scheduled executor.
   
   So we should make transaction buffered writer supports `io.netty.Timer`, to ensure that buffered writer can still use Scheduled executors in the future, we designed to support both.
   
   
   ### Documentation
   
   
   - [ ] `doc-required` 
     
   - [x] `doc-not-needed` 
     
   - [ ] `doc` 
   
   - [ ] `doc-complete`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports both ScheduledService and Timer

Posted by GitBox <gi...@apache.org>.
liangyepianzhou commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927240282


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -382,37 +398,49 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
                 return null;
             }
         }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any());
-        // Test threshold: writeMaxDelayInMillis.
-        TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
-                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, true);
         TxnLogBufferedWriter.AddDataCallback callback = Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
-        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+
+        // Test threshold: writeMaxDelayInMillis (use scheduled service).
+        TxnLogBufferedWriter txnLogBufferedWriter0 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, true);
+
+        txnLogBufferedWriter0.asyncAddData(100, callback, 100);
         Thread.sleep(90);
         // Verify does not refresh ahead of time.
         Assert.assertEquals(dataArrayFlushedToBookie.size(), 0);
         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> dataArrayFlushedToBookie.size() == 1);
         Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+        txnLogBufferedWriter0.close();
+
+        // Test threshold: writeMaxDelayInMillis (use timer).
+        TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                transactionTimer, dataSerializer, 32, 1024 * 4, 100, true);
+        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+        Thread.sleep(70);

Review Comment:
   Why is this not equal to writeMaxDelayInMillis?
   And if we can not use the Thread.sleep().



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize,
         }
         OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
                 .numThreads(5).name("tx-threads").build();
-        ScheduledExecutorService scheduledExecutorService =
-                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));
         JsonDataSerializer dataSerializer = new JsonDataSerializer(eachDataBytesLen);
         /**
          * Execute test task.
          *   1. Write many times.
          *   2. Store the param-context and param-position of callback function for verify.
          */
         // Create TxLogBufferedWriter.
-        TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
-                        managedLedger, orderedExecutor, scheduledExecutorService,
-                        dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize,
-                        batchedWriteMaxDelayInMillis, batchEnabled);
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        ScheduledExecutorService transactionScheduledService =
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));

Review Comment:
   ```suggestion
                   Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("txn-scheduler-threads"));
   ```



##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize,
         }
         OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
                 .numThreads(5).name("tx-threads").build();
-        ScheduledExecutorService scheduledExecutorService =
-                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));
         JsonDataSerializer dataSerializer = new JsonDataSerializer(eachDataBytesLen);
         /**
          * Execute test task.
          *   1. Write many times.
          *   2. Store the param-context and param-position of callback function for verify.
          */
         // Create TxLogBufferedWriter.
-        TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
-                        managedLedger, orderedExecutor, scheduledExecutorService,
-                        dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize,
-                        batchedWriteMaxDelayInMillis, batchEnabled);
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        ScheduledExecutorService transactionScheduledService =
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));

Review Comment:
   Please check other places.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports both ScheduledService and Timer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927243138


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -165,19 +167,30 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize,
         }
         OrderedExecutor orderedExecutor =  OrderedExecutor.newBuilder()
                 .numThreads(5).name("tx-threads").build();
-        ScheduledExecutorService scheduledExecutorService =
-                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));
         JsonDataSerializer dataSerializer = new JsonDataSerializer(eachDataBytesLen);
         /**
          * Execute test task.
          *   1. Write many times.
          *   2. Store the param-context and param-position of callback function for verify.
          */
         // Create TxLogBufferedWriter.
-        TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<Integer>(
-                        managedLedger, orderedExecutor, scheduledExecutorService,
-                        dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize,
-                        batchedWriteMaxDelayInMillis, batchEnabled);
+        HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
+                1, TimeUnit.MILLISECONDS);
+        ScheduledExecutorService transactionScheduledService =
+                Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("tx-scheduler-threads"));

Review Comment:
   Already fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports Timer

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #16727:
URL: https://github.com/apache/pulsar/pull/16727


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports both ScheduledService and Timer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927246552


##########
pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java:
##########
@@ -382,37 +398,49 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
                 return null;
             }
         }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any());
-        // Test threshold: writeMaxDelayInMillis.
-        TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
-                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, true);
         TxnLogBufferedWriter.AddDataCallback callback = Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class);
-        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+
+        // Test threshold: writeMaxDelayInMillis (use scheduled service).
+        TxnLogBufferedWriter txnLogBufferedWriter0 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                scheduledExecutorService, dataSerializer, 32, 1024 * 4, 100, true);
+
+        txnLogBufferedWriter0.asyncAddData(100, callback, 100);
         Thread.sleep(90);
         // Verify does not refresh ahead of time.
         Assert.assertEquals(dataArrayFlushedToBookie.size(), 0);
         Awaitility.await().atMost(2, TimeUnit.SECONDS).until(() -> dataArrayFlushedToBookie.size() == 1);
         Assert.assertEquals(dataArrayFlushedToBookie.get(0).intValue(), 100);
+        txnLogBufferedWriter0.close();
+
+        // Test threshold: writeMaxDelayInMillis (use timer).
+        TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor,
+                transactionTimer, dataSerializer, 32, 1024 * 4, 100, true);
+        txnLogBufferedWriter1.asyncAddData(100, callback, 100);
+        Thread.sleep(70);

Review Comment:
   > Why is this not equal to writeMaxDelayInMillis?
   
   This is to verify that the task will not be executed prematurely, so less than `writeMaxDelayInMillis`.
   
   > And if we can not use the `Thread.sleep()`.
   
   Is there a recommended implementation ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports both ScheduledService and Timer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#issuecomment-1192043814

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports both ScheduledService and Timer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#issuecomment-1192111160

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports Timer

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927380922


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -73,15 +74,15 @@
 
     private final ManagedLedger managedLedger;
 
-    private final ScheduledExecutorService scheduledExecutorService;
+    private Timer timer;

Review Comment:
   final



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #16727: [improve][txn] PIP-160: Transaction buffered writer supports Timer

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #16727:
URL: https://github.com/apache/pulsar/pull/16727#discussion_r927404468


##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java:
##########
@@ -73,15 +74,15 @@
 
     private final ManagedLedger managedLedger;
 
-    private final ScheduledExecutorService scheduledExecutorService;
+    private Timer timer;

Review Comment:
   Already fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org