You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/20 03:37:18 UTC

[GitHub] [pulsar] liangyepianzhou commented on a diff in pull request #16685: [improve][txn] PIP-160: Transaction log store enables the batch feature

liangyepianzhou commented on code in PR #16685:
URL: https://github.com/apache/pulsar/pull/16685#discussion_r925127257


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java:
##########
@@ -94,17 +97,23 @@ public void testManagedLedgerMetrics() throws Exception {
 
     @Test
     public void testTransactionTopic() throws Exception {
+        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
+        txnLogBufferedWriterConfig.setBatchEnabled(true);
+        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
         admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
                 new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
         admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
         createTransactionCoordinatorAssign();
         ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
         managedLedgerConfig.setMaxEntriesPerLedger(2);
-        new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
-                pulsar.getManagedLedgerFactory(), managedLedgerConfig)
-                .initialize().join();
+        MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
+                pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
+                scheduledExecutorService);
+        mlTransactionLog.initialize().join();
         ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
         metrics.generate();
+        // cleanup.
+        mlTransactionLog.closeAsync();

Review Comment:
   ```suggestion
           mlTransactionLog.closeAsync();
           scheduledExecutorService.shutdown();
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();

Review Comment:
   ```suggestion
   
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");
                 transactionLog.append(transactionMetadataEntry)
                         .whenComplete((position, throwable) -> {
+                            System.out.println(id + " end");

Review Comment:
   ```suggestion
                            if (log.isDebugEnabled()) {
                                   log.debug("Transaction coordinator [{}] complete to open transaction [{}]",
                                           txnID.getMostSigBits(), txnID.getLeastSigBits());
                               }
   ```



##########
pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java:
##########
@@ -247,8 +247,11 @@ public CompletableFuture<TxnID> newTransaction(long timeOut) {
                         .setMetadataOp(TransactionMetadataEntry.TransactionMetadataOp.NEW)
                         .setLastModificationTime(currentTimeMillis)
                         .setMaxLocalTxnId(sequenceIdGenerator.getCurrentSequenceId());
+                String id = UUID.randomUUID().toString();
+                System.out.println(id + " start");

Review Comment:
   ```suggestion
                  if (log.isDebugEnabled()) {
                       log.debug("Transaction coordinator [{}] start to open transaction [{}]", 
                               txnID.getMostSigBits(), txnID.getLeastSigBits());
                   } 
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org