You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/19 08:44:30 UTC

[GitHub] [pulsar] Technoboy- commented on a diff in pull request #15592: [optimize][txn] Optimize transaction lowWaterMark to clean useless data faster

Technoboy- commented on code in PR #15592:
URL: https://github.com/apache/pulsar/pull/15592#discussion_r876786333


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -384,30 +376,36 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
     }
 
     private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
-        if (!ongoingTxns.isEmpty()) {
-            TxnID firstTxn = ongoingTxns.firstKey();
-            if (firstTxn.getMostSigBits() == txnID.getMostSigBits() && lowWaterMark >= firstTxn.getLeastSigBits()) {
-                ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L,
-                        firstTxn.getMostSigBits(), firstTxn.getLeastSigBits());
-                try {
-                    topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
-                        @Override
-                        public void addComplete(Position position, ByteBuf entryData, Object ctx) {
-                            synchronized (TopicTransactionBuffer.this) {
-                                aborts.put(firstTxn, (PositionImpl) position);
-                                updateMaxReadPosition(firstTxn);
-                            }
-                        }
-
-                        @Override
-                        public void addFailed(ManagedLedgerException exception, Object ctx) {
-                            log.error("Failed to abort low water mark for txn {}", txnID, exception);
-                        }
-                    }, null);
-                } finally {
-                    abortMarker.release();
+        lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> {
+            if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) {
+                return lowWaterMark;
+            } else {
+                return oldLowWaterMark;
+            }
+        });
+        if (handleLowWaterMark.tryAcquire()) {
+            if (!ongoingTxns.isEmpty()) {
+                TxnID firstTxn = ongoingTxns.firstKey();
+                long tCId = firstTxn.getMostSigBits();
+                Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId);
+                if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) {
+                    abortTxn(firstTxn, lowWaterMarkOfFirstTxnId)
+                            .thenRun(() -> {
+                                log.warn("Successes to abort low water mark for txn [{}], topic [{}],"
+                                        + " lowWaterMark [{}]", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId);
+                                handleLowWaterMark.release();
+                            })
+                            .exceptionally(ex -> {
+                                log.warn("Failed to abort low water mark for txn {}, topic [{}], "
+                                        + "lowWaterMark [{}], ", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId,
+                                        ex);
+                                handleLowWaterMark.release();
+                                return null;
+                            });
+                    return;
                 }
             }
+            handleLowWaterMark.release();

Review Comment:
   Other thread release the semphere ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org