You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "thetumbled (via GitHub)" <gi...@apache.org> on 2024/04/07 06:31:07 UTC

[PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

thetumbled opened a new pull request, #22452:
URL: https://github.com/apache/pulsar/pull/22452

   
   ### Motivation
   
   In non-transactional production, we update the `LastDataMessagePublishedTimestamp` when the message is persisted successfully. But in transactional production, we do not update `LastDataMessagePublishedTimestamp`, which will impact the feature `ReplicatedSubscription`.
   
   ### Modifications
   
   Update the `LastDataMessagePublishedTimestamp` when the transaction is committed, which guarantee that there are new messages available for reading.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   ### Does this pull request potentially affect one of the following parts:
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   *If the box was checked, please highlight the changes*
   
   - [ ] Dependencies (add or upgrade a dependency)
   - [ ] The public API
   - [ ] The schema
   - [ ] The default values of configurations
   - [ ] The threading model
   - [ ] The binary protocol
   - [ ] The REST endpoints
   - [ ] The admin CLI options
   - [ ] The metrics
   - [ ] Anything that affects deployment
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1556727473


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   title changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555571360


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Yes, it does.  But i think that it is a good tradeoff that we **allow wrong update**, because `LastDataMessagePublishedTimestamp` is used to skip creating unnecessary subscription replication snapshot. 
   Updating incorrectly just creating an unnecessary snapshot. 
   On the contrary, if we chase for strictly correct, we have to take lots of effort to determine whether the max read position move forward or not.
   But we **can't allow missing update**, which will skip creating necessary subscription replication snapshot. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1560357164


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -674,6 +700,10 @@ private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> rea
         }
     }
 
+    public interface MaxReadPositionCallBack {
+        void moveForward(PositionImpl oldPosition, PositionImpl newPosition);

Review Comment:
   I have change the method name and complete the documents, PTAL, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1559168142


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,35 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position.
+     * @param newPosition new max read position
+     * @param disableCallback whether disable the callback.

Review Comment:
   Why is it needed to `disableCallback` ? Perhaps a comment about this in the javadoc would be useful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2095294753

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590809858


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   `syncMaxReadPositionForNormalPublish` is used to update the max read position for non-transactional message production. If there is any ongoning txn, we can't move the max read position. We should call `MaxReadPositionCallBack` when the max read position does change.
   I don't get your point, could you provide a more detailed explanation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2103694782

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555571360


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Yes, it does.  But i think that it is a good tradeoff that we **allow wrong update**, because `LastDataMessagePublishedTimestamp` is used to skip creating unnecessary subscription replication snapshot. 
   Updating incorrectly just creating an unnecessary snapshot. 
   On the contrary, if we chase for strictly correct, we have take lots of effort to determine whether the max read position move forward or not.
   But we can't allow missing update, which will skip creating necessary subscription replication snapshot. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555491010


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   And the correct fix should be only update the `LastDataMessagePublishedTimestamp` while the max read position is updated. The commit and abort operation can happen without the max read position change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046921479

   > > I just don't understand, it seems ReplicatorSubscriptionController only snapshots `ledger.lastPosition`, why do we update `lastDataMessagePublishedTimestamp` after `maxReadPosition` moved?
   > 
   > We may need to have a discussion about which time point should we update `lastDataMessagePublishedTimestamp`. There are two options:
   > 
   > * update `lastDataMessagePublishedTimestamp` whenever a transactional/non-transactional message is persisted.
   > * update `lastDataMessagePublishedTimestamp` when the max position move forward.
   > 
   > The difference of these two options is that the frequency of the former is greater than the latter. With greater frequency, we have a more precice position for ReplicatorSubscription. But because the consumer can't consume the messages after the max read position, and the number of snapshot is limited(default 10), there are high risk that when a consumer ack a position belonging to a committed transaction, the corresponding snapshots are already evicted(the frequency of snapshot is very fast, a transaction last for more than 10s will meet such case in default configuration), thus the `ReplicatorSubscription` feature can't work at all. Looking for your reply. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-
   
   +1 with option 1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1557509007


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.pulsar.broker.service;

Review Comment:
   done, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2106570312

   @poorbarcode any more change requests?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590951943


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   This logic is to maintain the original logic in:
   https://github.com/apache/pulsar/blob/eee3694f00e269eef0f75d791521d0d35d8ff411/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L487-L488
   Could you help to verify this? @congbobo184  @liangyepianzhou 
   Should we update the `changeMaxReadPositionAndAddAbortTimes` when there is no snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2041336174

   PTAL, thanks. @BewareMyPower @poorbarcode @codelipenghui @liangyepianzhou 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554842472


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   I've checked the source code, it is for `ReplicatedSubscriptionsController`, use `LastDataMessagePublishedTimestamp` to determine if we need to snapshot. In an ideal situation, we need to update `LastDataMessagePublishedTimestamp` after txn committed. But topic transaction buffer just maintain the first message position of the txn. Maybe we can update `LastDataMessagePublishedTimestamp` after txn committed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1560357164


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -674,6 +700,10 @@ private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> rea
         }
     }
 
+    public interface MaxReadPositionCallBack {
+        void moveForward(PositionImpl oldPosition, PositionImpl newPosition);

Review Comment:
   I have changed the method name and complete the documents, PTAL, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590590340


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;

Review Comment:
   Why do you set `maxReadPosition` at this line? It will make all the messages can be read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2094604682

   > ![image](https://private-user-images.githubusercontent.com/21362791/327988468-6bcbf96d-20ea-485c-bb96-39d3fac17f9f.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTQ4ODQ2ODgsIm5iZiI6MTcxNDg4NDM4OCwicGF0aCI6Ii8yMTM2Mjc5MS8zMjc5ODg0NjgtNmJjYmY5NmQtMjBlYS00ODVjLWJiOTYtMzlkM2ZhYzE3ZjlmLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNDA1MDUlMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjQwNTA1VDA0NDYyOFomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTczYzcxOGMzNjJjYzkwNzQ4ZmRjODJhOTk5MTJhZmRjNWFiMmVhNjFkYzQ4OWJkZWYwYTI2M2I4ZGJmNmY3NWMmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0JmFjdG9yX2lkPTAma2V5X2lkPTAmcmVwb19pZD0wIn0.mch_szoFO4kvEtTopuxo3GBz1S-s8IIOPzKUG6Qi5Vs) The test keeps failing, could you please resolve it?
   
   I don't meet this error.
   <img width="757" alt="image" src="https://github.com/apache/pulsar/assets/52550727/a0cc44ed-88fb-433b-ade2-7ecafd7209e6">
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590951943


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   This logic is to maintain the original logic in:
   https://github.com/apache/pulsar/blob/eee3694f00e269eef0f75d791521d0d35d8ff411/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L487-L488
   Could you help to verify this? @congbobo184  
   Should we update the `changeMaxReadPositionAndAddAbortTimes` when there is no snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2102608000

   > ```
   > Error: org.apache.pulsar.broker.service.ReplicatorSubscriptionWithTransactionTest.testReplicatedSubscriptionWhenReplicatorProducerIsClosed Time elapsed: 12.567 s <<< FAILURE!
   > org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorSubscriptionTest expected object to not be null within 10 seconds.
   > ```
   > 
   > Pulsar CI / CI - Unit - Brokers - Broker Group 1 (pull_request) test is not stable, please check the reason
   
   I have fixed the test code, please help to trigger the CI again, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1591722666


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   At step 3, we will update `lastMaxReadPositionMovedForwardTimestamp` with following logic chain:
   `org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#commitTxn` -> `org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#removeTxnAndUpdateMaxReadPosition` -> `org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#updateMaxReadPosition`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554835891


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   better to move to https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L3907



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555575940


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   > Oh I see. So, the committed transaction messages will not be replicated to the remote cluster without this fix?
   > 
   > If yes,
   > 
   > I think we should change the PR title to "[fix][broker] fix the transaction messages cannot be replicated" And, a test for replicating transaction messages should be added to make sure we can fix the issue end to end
   
   Yes, we will always skip taking snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555619440


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   > Yes, we will always skip taking snapshot. But `ReplicatedSubscription` feature is not used to replicate messages, but replicate subscription state.
   
   @thetumbled I think this could be reflected in the title of the PR. "[fix][broker] fix replicated subscriptions for transactional messages" etc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555616943


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   why is it needed to update the value when the transaction is aborted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1558800780


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -151,8 +151,9 @@ public interface TransactionBuffer {
     /**
      * Sync max read position for normal publish.
      * @param position {@link PositionImpl} the position to sync.
+     * @param isMakerMessage whether the message is maker message.
      */
-    void syncMaxReadPositionForNormalPublish(PositionImpl position);
+    void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMakerMessage);

Review Comment:
   typo in the parameter name, it's "marker" message



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -151,8 +151,9 @@ public interface TransactionBuffer {
     /**
      * Sync max read position for normal publish.
      * @param position {@link PositionImpl} the position to sync.
+     * @param isMakerMessage whether the message is maker message.

Review Comment:
   typo, maker -> marker



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2093677033

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22452?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `55.55556%` with `16 lines` in your changes are missing coverage. Please review.
   > Project coverage is 38.30%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`3cff761`)](https://app.codecov.io/gh/apache/pulsar/pull/22452?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 220 commits behind head on master.
   
   <details><summary>Additional details and impacted files</summary>
   
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22452/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22452?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #22452       +/-   ##
   =============================================
   - Coverage     73.57%   38.30%   -35.27%     
   + Complexity    32624    14345    -18279     
   =============================================
     Files          1877     1672      -205     
     Lines        139502   133188     -6314     
     Branches      15299    14893      -406     
   =============================================
   - Hits         102638    51023    -51615     
   - Misses        28908    75763    +46855     
   + Partials       7956     6402     -1554     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22452/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22452/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.74% <55.55%> (+3.16%)` | :arrow_up: |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22452/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22452/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `36.69% <50.00%> (-36.16%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22452?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...sar/broker/service/persistent/PersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22452?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentTopic.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=) | `51.39% <71.42%> (-27.07%)` | :arrow_down: |
   | [.../persistent/ReplicatedSubscriptionsController.java](https://app.codecov.io/gh/apache/pulsar/pull/22452?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FReplicatedSubscriptionsController.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUmVwbGljYXRlZFN1YnNjcmlwdGlvbnNDb250cm9sbGVyLmphdmE=) | `0.00% <0.00%> (-72.60%)` | :arrow_down: |
   | [...nsaction/buffer/impl/TransactionBufferDisable.java](https://app.codecov.io/gh/apache/pulsar/pull/22452?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Ftransaction%2Fbuffer%2Fimpl%2FTransactionBufferDisable.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9UcmFuc2FjdGlvbkJ1ZmZlckRpc2FibGUuamF2YQ==) | `50.00% <40.00%> (-6.53%)` | :arrow_down: |
   | [...ransaction/buffer/impl/TopicTransactionBuffer.java](https://app.codecov.io/gh/apache/pulsar/pull/22452?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Ftransaction%2Fbuffer%2Fimpl%2FTopicTransactionBuffer.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9Ub3BpY1RyYW5zYWN0aW9uQnVmZmVyLmphdmE=) | `36.38% <76.47%> (-51.37%)` | :arrow_down: |
   | [...ransaction/buffer/impl/InMemTransactionBuffer.java](https://app.codecov.io/gh/apache/pulsar/pull/22452?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Ftransaction%2Fbuffer%2Fimpl%2FInMemTransactionBuffer.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci90cmFuc2FjdGlvbi9idWZmZXIvaW1wbC9Jbk1lbVRyYW5zYWN0aW9uQnVmZmVyLmphdmE=) | `0.00% <0.00%> (-57.58%)` | :arrow_down: |
   
   ... and [1511 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22452/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1595330968


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +502,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   if in no snapshot state, we shouldn't increment  changeMaxReadPositionCount



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2102497144

   ```
   Error: org.apache.pulsar.broker.service.ReplicatorSubscriptionWithTransactionTest.testReplicatedSubscriptionWhenReplicatorProducerIsClosed Time elapsed: 12.567 s <<< FAILURE!
   org.awaitility.core.ConditionTimeoutException: Assertion condition defined as a org.apache.pulsar.broker.service.ReplicatorSubscriptionTest expected object to not be null within 10 seconds.
   ```
   Pulsar CI / CI - Unit - Brokers - Broker Group 1 (pull_request) test is not stable, please check the reason


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1595408387


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +502,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   Agree. For non-transactinal production, we should not increment changeMaxReadPositionCount and take the snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1556727969


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Abortion of a transaction may move forward the max read position too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554842746


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   And maybe we need consider race condition while updating it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555421137


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   `lastDataMessagePublishedTimestamp` is used to determined whether there are new messages available for reading by `ReplicatedSubscription` feature **only**. So we should update it when the max read position move forward.
   It is normal that we produce transactional messages only for a specific topic, in such case we don't update `LastDataMessagePublishedTimestamp` at all for now, we need to fix it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555575940


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   > Oh I see. So, the committed transaction messages will not be replicated to the remote cluster without this fix?
   > 
   > If yes,
   > 
   > I think we should change the PR title to "[fix][broker] fix the transaction messages cannot be replicated" And, a test for replicating transaction messages should be added to make sure we can fix the issue end to end
   
   Yes, we will always skip taking snapshot. But `ReplicatedSubscription` feature is not used to replicate messages, but replicate subscription state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554947516


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   need to consider race conditions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1560356047


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,35 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position.
+     * @param newPosition new max read position
+     * @param disableCallback whether disable the callback.

Review Comment:
   add some documents.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555489032


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Oh I see. So, the committed transaction messages will not be replicated to the remote cluster without this fix?
   
   If yes,
   
   I think we should change the PR title to "[fix][broker] fix the transaction messages cannot be replicated"
   And, a test for replicating transaction messages should be added to make sure we can fix the issue end to end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555526796


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   No, we can still trigger the replication by another bug. refer to anther pr:
   https://github.com/apache/pulsar/pull/22461
   
   Maybe current test code is enough, this pr is to correct the field `LastDataMessagePublishedTimestamp`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2101877822

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2095062723

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590612538


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   This change means only there are no ongoing TXNs, the variable `lastMaxReadPositionMovedForwardTimestamp` will be changed, which is wrong. We should only call `MaxReadPositionCallBack` after committing/aborting transactions, right? Could you also add a test for this case.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   This change means only there are no ongoing TXNs, the variable `lastMaxReadPositionMovedForwardTimestamp` will be changed, which is wrong. We should only call `MaxReadPositionCallBack` after committing/aborting transactions, right? Could you also add a test for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1593288742


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   It looks a little strange, `checkIfNoSnapshot()` just means there are no snapshots to recover.
   
   ```java
    if (checkIfNoSnapshot()) { 
   	     this.maxReadPosition = position; 
   ```
   means if there are no transaction messages, all the messages are able to dispatch.
   
   I agree with @poorbarcode we don't need this check, once maxReadPosition moved forward, we need update changeMaxReadPositionAndAddAbortTimes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2047049489

   > * update `lastDataMessagePublishedTimestamp` when the max position move forward.
   
   this seems to make sense. assuming that "when the max position moves forward", there are more messages available to be read.
   It might make sense to rename `lastDataMessagePublishedTimestamp` to cover both the non-tranactional and tranactional case. The field name should also include the purpose, this is for replicated subscriptions. It's also possible to cover the meaning in a code comment for the field.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1558662937


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   > Yes, it does. But i think that it is a good tradeoff that we allow wrong update, because LastDataMessagePublishedTimestamp is used to skip creating unnecessary subscription replication snapshot.
   Updating incorrectly just creating an unnecessary snapshot.
   On the contrary, if we chase for strictly correctess, we have to take lots of effort to determine whether the max read position move forward or not.
   But we can't allow missing update, which will skip creating necessary subscription replication snapshot.
   
   We already have `updateMaxReadPosition(TxnID txnID)` method in TopicTransactionBuffer.java. And it checked if the maxReadPosition is changed or not. So we can just allow the topic to register a callback to the TopicTransactionBuffer.java
   
   ```java
       void updateMaxReadPosition(TxnID txnID) {
           PositionImpl preMaxReadPosition = this.maxReadPosition;
           ongoingTxns.remove(txnID);
           if (!ongoingTxns.isEmpty()) {
               PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
               maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
           } else {
               maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
           }
           if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
               this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
              // new added
               callback.onMaxReadPostionUpdated(this.maxReadPosition);
           }
       }
   ```
   
   Keep the code easy to understand is important, I mean if someone new to this part to understand why we should update the `LastDataMessagePublishedTimestamp` when adding commit and abort markers to the topic and the description of `LastDataMessagePublishedTimestamp` said the internal marker shouldn't update it. 
   
   @thetumbled ^^^
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1558662937


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   > Yes, it does. But i think that it is a good tradeoff that we allow wrong update, because LastDataMessagePublishedTimestamp is used to skip creating unnecessary subscription replication snapshot.
   Updating incorrectly just creating an unnecessary snapshot.
   On the contrary, if we chase for strictly correctess, we have to take lots of effort to determine whether the max read position move forward or not.
   But we can't allow missing update, which will skip creating necessary subscription replication snapshot.
   
   We already have `updateMaxReadPosition(TxnID txnID)` method in TopicTransactionBuffer.java. And it checked if the maxReadPosition is changed or not. So we can just allow the topic to register a callback to the TopicTransactionBuffer.java
   
   ```java
       void updateMaxReadPosition(TxnID txnID) {
           PositionImpl preMaxReadPosition = this.maxReadPosition;
           ongoingTxns.remove(txnID);
           if (!ongoingTxns.isEmpty()) {
               PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
               maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
           } else {
               maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
           }
           if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
               this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
              // new added
               callback.onMaxReadPostionUpdated(this.maxReadPosition);
           }
       }
   ```
   
   Keep the code easy to understand is important, I mean if someone new to this part to understand why we should update the `LastDataMessagePublishedTimestamp` when adding commit and abort markers to the topic and the description of `LastDataMessagePublishedTimestamp` said the internal marker shouldn't update it. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1558804788


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -151,8 +151,9 @@ public interface TransactionBuffer {
     /**
      * Sync max read position for normal publish.
      * @param position {@link PositionImpl} the position to sync.
+     * @param isMakerMessage whether the message is maker message.

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555421137


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   `lastDataMessagePublishedTimestamp` is used to determined whether there are new messages available for reading by `ReplicatedSubscription` feature **only**. So we should update the `lastDataMessagePublishedTimestamp` when the max read position move forward.
   It is normal that we produce transactional messages only for a specific topic, in such case we don't update `LastDataMessagePublishedTimestamp` at all for now, we need to fix it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554849280


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   I think we need to update it after the txn is aborted or commited, both of which can move forward the max read position.
   It is normal that we produce transactional messages only for a specific topic, in such case we don't update `LastDataMessagePublishedTimestamp` at all for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554987948


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   I'm afraid yes, to prevent potential issues.
   Just 
   ```java
   transactionBuffer.abortTxn(txnId, lowWaterMarker).thenRunAsync(() -> {
    .... 
   }, ((ManagedLedgerImpl) ledger).getExecutor());
   ```
   is OK, make update `lastDataMessagePublishedTimestamp ` in a same thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554994078


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   > why here have a race conditions? @dao-jun
   
   @Technoboy-  Oh, sorry. I didn't check the code, I thought the Future completed by the other thread, but in actually, they are in the same thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "Technoboy- (via GitHub)" <gi...@apache.org>.
Technoboy- commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554991877


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   why here have a race conditions? @dao-jun 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari closed pull request #22452: [fix][broker] fix replicated subscriptions for transactional messages
URL: https://github.com/apache/pulsar/pull/22452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590590340


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;

Review Comment:
   <del>Why do you set `maxReadPosition` at this line? It will make all the messages can be read.</del>
   
   Please ignore this comment, it is my mistake



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2042068671

   > @thetumbled You need to add a test related to transaction, right?
   
   `testUpdateLastDataMessagePublishedTimestampForTransactionalPublish()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046684576

   > it looks if `transactionCoordinatorEnabled=false` and publish normal msgs only to the topic, `lastDataMessagePublishedTimestamp` will not update.
   
   
   
   > it looks if `transactionCoordinatorEnabled=false` and publish normal msgs only to the topic, `lastDataMessagePublishedTimestamp` will not update.
   
   We update `lastDataMessagePublishedTimestamp` by `syncMaxReadPositionForNormalPublish` in normal cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2042060475

   > Could you add a test for it?
   
   Added, PTAL, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1557164352


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Could you help to write a end-to-end test? I wrote one, but not right. @lhotari 
   https://github.com/apache/pulsar/commit/e27fd14a50960604628a1c3434e6ec9657805d6a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555526796


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   No, we can still trigger the replication by another bug. refer to anther pr:
   https://github.com/apache/pulsar/pull/22461
   
   Maybe current test code is enough, this pr is to correct the field `LastDataMessagePublishedTimestamp`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046490186

   Hi everyone, i have add the end-to-end test code to reproduce the bug `replicated subscriptions not work for transactional messages`, you can add it alone to master branch to reproduce the bug.
   and the implementation has been refactored to update the timestamp when the max read position really change, without any false update or missing update. 
   Looking for your reply, thanks. @codelipenghui @lhotari @liangyepianzhou @dao-jun @BewareMyPower @Technoboy- 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2050810216

   PTAL, thanks. @lhotari @codelipenghui @liangyepianzhou @dao-jun @Technoboy-


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2085588942

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2094869547

   Closing and reopening to get recent changes from master branch to the PR build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590612538


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   This change means only there are no ongoing TXNs, the variable `lastMaxReadPositionMovedForwardTimestamp` will be changed, which is wrong. We should only call `MaxReadPositionCallBack` after committing/aborting transactions. And you should add a test for this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590612538


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   This change means only there are no ongoing TXNs, the variable `lastMaxReadPositionMovedForwardTimestamp` will be changed, which is wrong. We should only call `MaxReadPositionCallBack` after committing/aborting transactions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590600568


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   The check `!checkIfNoSnapshot()` is wrong, we need to remove it, right? Could you also add a test for this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2103726776

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590951943


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   This logic is to maintain the original logic in:
   https://github.com/apache/pulsar/blob/eee3694f00e269eef0f75d791521d0d35d8ff411/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L487-L488
   Could you help to verify this? Should we update the `changeMaxReadPositionAndAddAbortTimes` when there is no snapshot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1591720697


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   1. Start TXN `1`
   2. Start TXN `2`
   3, Commit TXN `1`
   4. TXN `2` is stuck.
   
   We need to update `lastMaxReadPositionMovedForwardTimestamp` at step `3`, right? But in your implementation, it will not because there is one ongoing TXN(`2`).



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   1. Start TXN `1`
   2. Start TXN `2`
   3. Commit TXN `1`
   4. TXN `2` is stuck.
   
   We need to update `lastMaxReadPositionMovedForwardTimestamp` at step `3`, right? But in your implementation, it will not because there is one ongoing TXN(`2`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari closed pull request #22452: [fix][broker] fix replicated subscriptions for transactional messages
URL: https://github.com/apache/pulsar/pull/22452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554838333


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   LastDataMessagePublishedTimestamp is used to determined whether there are new messages available for reading. A transactional production should not update it as the uncommitted messages are not visible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554842472


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   I've checked the source code, it is for `ReplicatedSubscriptionsController`, use `LastDataMessagePublishedTimestamp` to determine if we need to snapshot. In an ideal situation, we need to update `LastDataMessagePublishedTimestamp` after txn committed. But topic transaction buffer just maintain the first message position of the txn. Maybe we can update `LastDataMessagePublishedTimestamp` after txn committed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2085459244

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2099778828

   @poorbarcode any more change requests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode merged PR #22452:
URL: https://github.com/apache/pulsar/pull/22452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555413683


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   ```
       // Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
       @Getter
       private volatile long lastDataMessagePublishedTimestamp = 0;
   ```
   
   For ending transactions, we only put a marker message in the topic. From the description of this field, it shouldn't be updated for internal markers.
   
   Could you please provide more information of which behavior is not expected from the user's perspective?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555421137


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   `lastDataMessagePublishedTimestamp` is used to determined whether there are new messages available for reading by `ReplicatedSubscription` feature **only**. So we should update the `lastDataMessagePublishedTimestamp` when the max read position move forward.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1557501095


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionalReplicateSubscriptionTest.java:
##########
@@ -0,0 +1,174 @@
+package org.apache.pulsar.broker.service;

Review Comment:
   add license header



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1557188141


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   @thetumbled The high-level idea in the test looks fine, but it would be better to have it in a completely separate test class since there could be a need to enable transactions etc.. We tend to add too many unrelated tests in single test class and that causes problems. There's no need to have a 1-to-1 mapping between a production code class and a test class. That's leading to some problems in our test design. The test class name could be "TransactionalReplicateSubscriptionTest". (btw. the existing test class name "ReplicatorSubscriptionTest" has a typo). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046504894

   > I just don't understand, it seems ReplicatorSubscriptionController only snapshots `ledger.lastPosition`, why do we update `lastDataMessagePublishedTimestamp` after `maxReadPosition` moved?
   
   Fisrt of all, this pr is to fix the bug that `lastDataMessagePublishedTimestamp` can't move anyway, which result into the bug `replicated subscriptions not work for transactional messages`. So we need to move it for transactional messages.
   Secondly, it is meaningless that we snapshot a unreachable position. `ledger.lastPosition` can be greater than the max read position. So we need to move it for transactional messages when the max read position really changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046495938

   I just don't understand, it seems ReplicatorSubscriptionController only snapshots `ledger.lastPosition`, why do we update `lastDataMessagePublishedTimestamp` after `maxReadPosition` moved?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "liangyepianzhou (via GitHub)" <gi...@apache.org>.
liangyepianzhou commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2042065034

   @thetumbled You need to add a test related to transaction, right? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555571360


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Yes, it does.  But i think that it is a good tradeoff that we **allow wrong update**, because `LastDataMessagePublishedTimestamp` is used to skip creating unnecessary subscription replication snapshot. 
   Updating incorrectly just creating an unnecessary snapshot. 
   On the contrary, if we chase for strictly correct, we have to take lots of effort to determine whether the max read position move forward or not.
   But we can't allow missing update, which will skip creating necessary subscription replication snapshot. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554949020


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Do we need to guarantee strict correctess?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1558795331


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Thanks for reply, i have refactor the implementation, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1558804632


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -151,8 +151,9 @@ public interface TransactionBuffer {
     /**
      * Sync max read position for normal publish.
      * @param position {@link PositionImpl} the position to sync.
+     * @param isMakerMessage whether the message is maker message.
      */
-    void syncMaxReadPositionForNormalPublish(PositionImpl position);
+    void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMakerMessage);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2084914371

   closing and reopening to pick up recent changes from master branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590600568


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   The check `!checkIfNoSnapshot()` is wrong, we need to remove it, right? Could you also add a test for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2093587476

   Closing and reopening to get recent changes from master branch to the PR build.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590600568


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   The check `!checkIfNoSnapshot()` is wrong, we need to remove it, right? And you should add a test for this case



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "congbobo184 (via GitHub)" <gi...@apache.org>.
congbobo184 commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1595330968


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +502,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   if in noSnapshot state, we shouldn't increment  changeMaxReadPositionCount



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2102006707

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590951943


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   This logic is to maintain the original logic in:
   https://github.com/apache/pulsar/blob/eee3694f00e269eef0f75d791521d0d35d8ff411/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java#L487-L488
   Could you help to verify this? @congbobo184  @liangyepianzhou 
   Should we update the `changeMaxReadPositionAndAddAbortTimes` when there is no snapshot?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1591722666


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   At step 3, we will update `lastMaxReadPositionMovedForwardTimestamp` with following logic chain:
   `org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#commitTxn` -> `org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#removeTxnAndUpdateMaxReadPosition` -> `org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer#updateMaxReadPosition`
   instead of  `syncMaxReadPositionForNormalPublish`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1593300232


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   Agree too. I have updated it, PTAL thanks. @dao-jun @poorbarcode @congbobo184 @liangyepianzhou 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2103765626

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2109230737

   I'm giving up on cherry-picking #21816 since that's too large change. I'll check what the conflicts are when applying only #22656 and #22452.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2109219574

   cherry-picking this to branch-3.0 . it looks like #21816 and #22656 need to be cherry-picked before this one to reduce merge conflicts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554843436


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -321,6 +323,8 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                         }
                         txnCommittedCounter.increment();
                         completableFuture.complete(null);
+                        // need to update the last data message published timestamp
+                        topic.setLastDataMessagePublishedTimestamp(Clock.systemUTC().millis());

Review Comment:
   Actually, I think update `LastDataMessagePublishedTimestamp` before or after the txn committed doesn't matter.
   Because if there is a non-txn message published to the topic before some txn committed will also update it.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1555571360


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   Yes, it does.  But i think that it is a good tradeoff that we **allow wrong update**, because `LastDataMessagePublishedTimestamp` is used to skip creating unnecessary subscription replication snapshot. 
   Updating incorrectly just creating an unnecessary snapshot. 
   On the contrary, if we chase for strictly correctess, we have to take lots of effort to determine whether the max read position move forward or not.
   But we **can't allow missing update**, which will skip creating necessary subscription replication snapshot. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix] [broker] fix not updating LastDataMessagePublishedTimestamp in transactional production. [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1554987948


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3942,9 +3942,13 @@ public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishCon
     @Override
     public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
         if (TxnAction.COMMIT_VALUE == txnAction) {
-            return transactionBuffer.commitTxn(txnID, lowWaterMark);
+            return transactionBuffer.commitTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();
+            });
         } else if (TxnAction.ABORT_VALUE == txnAction) {
-            return transactionBuffer.abortTxn(txnID, lowWaterMark);
+            return transactionBuffer.abortTxn(txnID, lowWaterMark).thenRun(() -> {
+                lastDataMessagePublishedTimestamp = Clock.systemUTC().millis();

Review Comment:
   I'm afraid yes, to prevent potential issues.
   Just ```java
   transactionBuffer.abortTxn(txnId, lowWaterMarker).thenRunAsync(() -> {
    .... 
   }, ((ManagedLedgerImpl) ledger).getExecutor());
   ```
   is OK, make update `lastDataMessagePublishedTimestamp ` in a same thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2048840477

   > > * update `lastDataMessagePublishedTimestamp` when the max position move forward.
   > 
   > this seems to make sense. assuming that "when the max position moves forward", there are more messages available to be read. It might make sense to rename `lastDataMessagePublishedTimestamp` to cover both the non-tranactional and tranactional case. The field name should also include the purpose, this is for replicated subscriptions. It's also possible to cover the meaning in a code comment for the field.
   
   good idea.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046496815

   > Please check the typo "maker", should be "marker"
   
   fixed, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2046915812

   > I just don't understand, it seems ReplicatorSubscriptionController only snapshots `ledger.lastPosition`, why do we update `lastDataMessagePublishedTimestamp` after `maxReadPosition` moved?
   
   We may need to have a discussion about which time point should we update `lastDataMessagePublishedTimestamp`. There are two options:
   - update `lastDataMessagePublishedTimestamp` whenever a transactional/non-transactional message is persisted.
   - update `lastDataMessagePublishedTimestamp` when the max position move forward. 
   The difference of these two options is that the frequency of the former is greater than the latter. With greater frequency, we have a more precice position for ReplicatorSubscription. 
   But because the consumer can't consume the messages after the max read position, and the number of snapshot is limited(default 10), there are risks that when a consumer ack a position belonging to a committed transaction, the corresponding snapshots are already evicted, thus the `ReplicatorSubscription` feature can't work at all.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1559163062


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -674,6 +700,10 @@ private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> rea
         }
     }
 
+    public interface MaxReadPositionCallBack {
+        void moveForward(PositionImpl oldPosition, PositionImpl newPosition);

Review Comment:
   `moveForward` is in imperative format and it sounds like a command. It would be better to rename it so that it's clear that this is an event. `maxReadPositionMovedForward` would be one possibility. 
   Please also add Javadoc that clarifies the purpose and contract of this interface and the event information that the method call provides.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2094000170

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1590598410


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;

Review Comment:
   it seems it just replace 
   ```java
   maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
   ```
   and 
   ```java
   maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
   ```
   
   to `updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback)`.
   
   ![image](https://github.com/apache/pulsar/assets/21362791/5deff0a4-f409-450d-bdce-1e9cc30853a5)
   
   I think it's OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2106762113

   PTAL, thanks. @poorbarcode 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1593300232


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -444,17 +447,39 @@ private void takeSnapshotByTimeout() {
                 takeSnapshotIntervalTime, TimeUnit.MILLISECONDS);
     }
 
-    void updateMaxReadPosition(TxnID txnID) {
-        PositionImpl preMaxReadPosition = this.maxReadPosition;
+    /**
+     * remove the specified transaction from ongoing transaction list and update the max read position.
+     * @param txnID
+     */
+    void removeTxnAndUpdateMaxReadPosition(TxnID txnID) {
         ongoingTxns.remove(txnID);
         if (!ongoingTxns.isEmpty()) {
             PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey());
-            maxReadPosition = ((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position);
+            updateMaxReadPosition(((ManagedLedgerImpl) topic.getManagedLedger()).getPreviousPosition(position), false);
         } else {
-            maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry();
+            updateMaxReadPosition((PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(), false);
         }
-        if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) {
-            this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement();
+    }
+
+    /**
+     * update the max read position. if the new position is greater than the current max read position,
+     * we will trigger the callback, unless the disableCallback is true.
+     * Currently, we only use the callback to update the lastMaxReadPositionMovedForwardTimestamp.
+     * For non-transactional production, some marker messages will be sent to the topic, in which case we don't need
+     * to trigger the callback.
+     * @param newPosition new max read position to update.
+     * @param disableCallback whether disable the callback.
+     */
+    void updateMaxReadPosition(PositionImpl newPosition, boolean disableCallback) {
+        PositionImpl preMaxReadPosition = this.maxReadPosition;
+        this.maxReadPosition = newPosition;
+        if (preMaxReadPosition.compareTo(this.maxReadPosition) < 0) {
+            if (!checkIfNoSnapshot()) {

Review Comment:
   Agree too. I have updated it, PTAL thanks. @dao-jun @poorbarcode @congbobo184 @liangyepianzhou 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2094562375

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2094573364

   ![image](https://github.com/apache/pulsar/assets/21362791/6bcbf96d-20ea-485c-bb96-39d3fac17f9f)
   The test keeps failing, could you please resolve it?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun closed pull request #22452: [fix][broker] fix replicated subscriptions for transactional messages
URL: https://github.com/apache/pulsar/pull/22452


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "thetumbled (via GitHub)" <gi...@apache.org>.
thetumbled commented on PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#issuecomment-2109229522

   > cherry-picking this to branch-3.0 . it looks like #21816 and #22656 need to be cherry-picked before this one to reduce merge conflicts.
   
   Hi, Lari. You can cherry pick https://github.com/apache/pulsar/pull/22656 into branch-3.0 without conflict, and i can help to  cherry pick this pr if there are any conflict.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] fix replicated subscriptions for transactional messages [pulsar]

Posted by "poorbarcode (via GitHub)" <gi...@apache.org>.
poorbarcode commented on code in PR #22452:
URL: https://github.com/apache/pulsar/pull/22452#discussion_r1591773534


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -479,17 +504,22 @@ public synchronized boolean isTxnAborted(TxnID txnID, PositionImpl readPosition)
         return snapshotAbortedTxnProcessor.checkAbortedTransaction(txnID);
     }
 
+    /**
+     * Sync max read position for normal publish.
+     * @param position {@link PositionImpl} the position to sync.
+     * @param isMarkerMessage whether the message is marker message, in such case, we
+     *                       don't need to trigger the callback to update lastMaxReadPositionMovedForwardTimestamp.
+     */
     @Override
-    public void syncMaxReadPositionForNormalPublish(PositionImpl position) {
+    public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean isMarkerMessage) {
         // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback
         // thread is the same tread, in this time the lastAddConfirm don't content transaction message.
         synchronized (TopicTransactionBuffer.this) {
             if (checkIfNoSnapshot()) {
-                this.maxReadPosition = position;
+                updateMaxReadPosition(position, isMarkerMessage);
             } else if (checkIfReady()) {
                 if (ongoingTxns.isEmpty()) {
-                    maxReadPosition = position;
-                    changeMaxReadPositionAndAddAbortTimes.incrementAndGet();
+                    updateMaxReadPosition(position, isMarkerMessage);

Review Comment:
   Ah, sorry, I mistakenly thought `disableCallback` was `doCallback`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org