You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/07/21 08:27:46 UTC

[GitHub] [pulsar] congbobo184 commented on a diff in pull request #16707: [improve][txn] PIP-160: Pending ack log store enables the batch feature

congbobo184 commented on code in PR #16707:
URL: https://github.com/apache/pulsar/pull/16707#discussion_r926310974


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java:
##########
@@ -343,6 +346,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
         this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
         this.backlogQuotaChecker = Executors
                 .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
+        this.transactionLogBufferedWriteAsyncFlushTrigger = Executors
+                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));

Review Comment:
   change name or use exist thread



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
                 if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(new PersistenceException(exception));
             }
         }, null);
         return completableFuture;
     }
 
+    /**
+     * Build the index mapping of Transaction pending ack log (aka t-log) and Topic message log (aka m-log).
+     * When m-log has been ack, t-log which holds m-log is no longer useful, this method builder the mapping of them.
+     *
+     * If a Ledger Entry has many t-log, we only need to care about the record that carries the largest acknowledgement
+     * info. Because all Commit/Abort log after this record describes behavior acknowledgement, if the behavior
+     * acknowledgement has been handle correct, these Commit/Abort log is no longer useful.
+     * @param logPosition The position of batch log Entry.
+     * @param logList Pending ack log records in a batch log Entry.
+     */
+    private void handleMetadataEntry(PositionImpl logPosition, List<PendingAckMetadataEntry> logList) {
+        // Find the record that carries the largest ack info, and call "handleMetadataEntry(position, pendingAckLog)"
+        PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+        PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+        for (int i = logList.size() - 1; i >= 0; i--){
+            PendingAckMetadataEntry pendingAckLog = logList.get(i);
+            if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+                    && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) {
+                continue;
+            }
+            if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+                continue;
+            }
+            for (PendingAckMetadata ack : pendingAckLog.getPendingAckMetadatasList()){
+                if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(), ack.getEntryId()) < 0){
+                    maxAcknowledgementPosition = PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+                    pendingAckLogHasMaxAckPosition = pendingAckLog;
+                }
+            }
+        }
+
+        if (pendingAckLogHasMaxAckPosition != null) {
+            handleMetadataEntry(logPosition, pendingAckLogHasMaxAckPosition);

Review Comment:
   commit or abort all need handleMetadataEntry right?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java:
##########
@@ -240,13 +280,48 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
                 if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
                     managedLedger.readyToCreateNewLedger();
                 }
-                buf.release();
                 completableFuture.completeExceptionally(new PersistenceException(exception));
             }
         }, null);
         return completableFuture;
     }
 
+    /**
+     * Build the index mapping of Transaction pending ack log (aka t-log) and Topic message log (aka m-log).
+     * When m-log has been ack, t-log which holds m-log is no longer useful, this method builder the mapping of them.
+     *
+     * If a Ledger Entry has many t-log, we only need to care about the record that carries the largest acknowledgement
+     * info. Because all Commit/Abort log after this record describes behavior acknowledgement, if the behavior
+     * acknowledgement has been handle correct, these Commit/Abort log is no longer useful.
+     * @param logPosition The position of batch log Entry.
+     * @param logList Pending ack log records in a batch log Entry.
+     */
+    private void handleMetadataEntry(PositionImpl logPosition, List<PendingAckMetadataEntry> logList) {
+        // Find the record that carries the largest ack info, and call "handleMetadataEntry(position, pendingAckLog)"
+        PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null;
+        PositionImpl maxAcknowledgementPosition = this.maxAckPosition;
+        for (int i = logList.size() - 1; i >= 0; i--){
+            PendingAckMetadataEntry pendingAckLog = logList.get(i);
+            if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT
+                    && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) {
+                continue;
+            }
+            if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){
+                continue;
+            }
+            for (PendingAckMetadata ack : pendingAckLog.getPendingAckMetadatasList()){
+                if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(), ack.getEntryId()) < 0){
+                    maxAcknowledgementPosition = PositionImpl.get(ack.getLedgerId(), ack.getEntryId());
+                    pendingAckLogHasMaxAckPosition = pendingAckLog;
+                }
+            }
+        }

Review Comment:
   only the logList.size() is batchSize, we handle once is enough. don't every add ops handle



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org