You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by "shibd (via GitHub)" <gi...@apache.org> on 2024/04/24 09:14:31 UTC

[PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

shibd opened a new pull request, #22572:
URL: https://github.com/apache/pulsar/pull/22572

   ### Motivation
   
   #22571
   
   ### Analysis
   
   When enabling `replicateSubscriptionState` will use topic to sync subscription state, and make these message metadata as [Marker](https://github.com/apache/pulsar/blob/ac94296d2488b2b13d76fa2b1bfa71e9e3cfbb45/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Markers.java#L107-L120)
   
   These `marker` messages will not be sent to the consumer by the topic, and will automatically ack them.
   
   https://github.com/apache/pulsar/blob/fc393f69043be6eb1b2572a27f131656a2cbc7f6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java#L202-L217
   
   But `getLastMessageId` will always return the last message position, regardless of whether the last message is `marked` or `not`. This will cause the reader stuck.
   
   ```java
           while (reader.hasMessageAvailable()) {  // get true
                 Message message reader.readNext();  // never can't receive msg.
           }
   ```
   
   Your refer to this diagram to understand this bug:
   
   <img width="1164" alt="image" src="https://github.com/apache/pulsar/assets/33416836/62c5fdb2-9273-4b50-a527-0f649ffd8228">
   
   
   ### Modifications
   - Add `asyncReverseFindPositionOneByOne` method on `ManagedLedger` .
   - Add `getLastCanDispatchPosition` method on `Topic`, it will call `asyncReverseFindPositionOneByOne` to find the last position of entry that not is `replistateSubscriptionState` 
   - Change the `getLastMessageId` implement of `ServerCnx` to use `getLastCanDispatchPosition` instead of `getMaxReadPosition`.
   
   
   
   ### Verifying this change
   - Add `ManagedLedgerTest.testReverseFindPositionOneByOne` to cover ReverseFindPositionOneByOne method.
   - Add `testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage` to cover this bug.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#issuecomment-2081503888

   The PR handled the case of ServerOnlyMarker, but it looks we also need to handle txn aborted messages.
   I created a PR for improvement, PTAL
   https://github.com/apache/pulsar/pull/22610


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582980165


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ManagedLedgerImplUtils {
+
+    /**
+     * Reverse find last valid position one-entry by one-entry.
+     */
+    public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
+                                                                        final Predicate<Entry> predicate,
+                                                                        final PositionImpl startPosition) {
+        CompletableFuture<Position> future = new CompletableFuture<>();
+        if (!ledger.isValidPosition(startPosition)) {
+            future.complete(startPosition);
+        } else {

Review Comment:
   this check could be in the beginning of `internalAsyncReverseFindPositionOneByOne` method to reduce duplication.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ManagedLedgerImplUtils {
+
+    /**
+     * Reverse find last valid position one-entry by one-entry.
+     */
+    public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
+                                                                        final Predicate<Entry> predicate,
+                                                                        final PositionImpl startPosition) {
+        CompletableFuture<Position> future = new CompletableFuture<>();
+        if (!ledger.isValidPosition(startPosition)) {
+            future.complete(startPosition);
+        } else {
+            internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
+        }
+        return future;
+    }
+
+    private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger,
+                                                                 final Predicate<Entry> predicate,
+                                                                 final PositionImpl position,
+                                                                 final CompletableFuture<Position> future) {
+        ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                final Position position = entry.getPosition();
+                try {
+                    if (predicate.test(entry)) {
+                        future.complete(position);
+                        return;
+                    }
+                    PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position);
+                    if (!ledger.isValidPosition(previousPosition)) {
+                        future.complete(previousPosition);
+                    } else {

Review Comment:
   this check could be in the beginning of `internalAsyncReverseFindPositionOneByOne` method to reduce duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582000782


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   I also support the `asyncFindLastValidPosition` name. `Reverse` and `OneByOne` just describe how do you achieve the goal, but `findLastValidPosition` describes what goal you want to achieve.
   
   If the way of how to achieve the goal changes in future, the method name might not be proper.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582027199


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3625,6 +3626,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        return ledger.asyncReverseFindPositionOneByOne((entry -> {
+            MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+            // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+            return !Markers.isServerOnlyMarker(md);
+        }));
+    }

Review Comment:
   Oh, sorry, I mean if the result of `asyncReverseFindPositionOneByOne` is greater than `getMaxReadPosition`, then the result of `getMaxReadPosition` should be returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578970618


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {

Review Comment:
   For TransactionBuffer, it just handle the TXN commit/abort/append requests and provide the guarantee of READ_COMMITTED.
   Determine the position to dispatch is Topic's duty, TransactionBuffer should not be aware of it.
   See also https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4122-L4124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "Demogorgon314 (via GitHub)" <gi...@apache.org>.
Demogorgon314 commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582864460


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ManagedLedgerImplUtils {
+
+    /**
+     * Reverse find last valid position one-entry by one-entry.
+     */
+    public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
+                                                                        final Predicate<Entry> predicate,
+                                                                        final PositionImpl startPosition) {
+        CompletableFuture<Position> future = new CompletableFuture<>();
+        if (!ledger.isValidPosition(startPosition)) {
+            future.complete(startPosition);
+        } else {
+            internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
+        }
+        return future;
+    }
+
+    private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger,
+                                                                 final Predicate<Entry> predicate,
+                                                                 final PositionImpl position,
+                                                                 final CompletableFuture<Position> future) {
+        ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                final Position position = entry.getPosition();
+                try {
+                    if (predicate.test(entry)) {
+                        future.complete(position);
+                        return;
+                    }
+                    PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position);
+                    if (!ledger.isValidPosition(previousPosition)) {
+                        future.complete(previousPosition);
+                    } else {
+                        internalAsyncReverseFindPositionOneByOne(ledger, predicate,
+                                ledger.getPreviousPosition((PositionImpl) position), future);

Review Comment:
   ```java
   internalAsyncReverseFindPositionOneByOne(ledger, predicate, previousPosition, future);
   ```
   Why do we calculate the `previousPosition` again?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1585820129


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ManagedLedgerImplUtils {
+
+    /**
+     * Reverse find last valid position one-entry by one-entry.
+     */
+    public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
+                                                                        final Predicate<Entry> predicate,
+                                                                        final PositionImpl startPosition) {
+        CompletableFuture<Position> future = new CompletableFuture<>();
+        if (!ledger.isValidPosition(startPosition)) {
+            future.complete(startPosition);
+        } else {

Review Comment:
   fixed by https://github.com/apache/pulsar/pull/22610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578818795


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   I want to show two key points in the method name: `Reverse` and `OneByOne`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577908495


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {
+        return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   It looks a little strange, after recover finished and there is no ongoing txns, the maxReadPosition will be set to ledger's lastConfirmedEntry, and the last entry also could be a Marker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582008865


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,16 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        PositionImpl tnxMaxReadPosition = getMaxReadPosition();
+        if (tnxMaxReadPosition.compareTo((PositionImpl) topic.getLastPosition()) == 0) {
+            return topic.getLastDispatchablePosition();
+        } else {
+            return CompletableFuture.completedFuture(tnxMaxReadPosition);
+        }
+    }

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582033001


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3634,6 +3636,22 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        PositionImpl maxReadPosition = getMaxReadPosition();
+        // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions.
+        // so return `maxRedPosition` directly.
+        if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
+            return CompletableFuture.completedFuture(maxReadPosition);
+        } else {
+            return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
+                MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+                // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+                return !Markers.isServerOnlyMarker(md);
+            });
+        }
+    }

Review Comment:
   Not sure if there is a race condition here, if produce uncommitted transactions before searching a valid position, then we may return an uncommitted position.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577928912


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {
+        return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   And if all the txns committed/aborted, the maxReadPosition will also set to lastConfirmedEntry, do we need to consider the case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578778014


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {
+        return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   Make sense to me, I add this logic and add `ReplicatorSubscriptionWithTransactionTest` to cover treansaction enable case. Thanks for your reviews.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578834981


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {

Review Comment:
   About the simple method comment, I know your intention.
   
   After these change, there will be fewer methods. But this logic runs into the `persistentTopic`.
   ```java
           PositionImpl tnxMaxReadPosition = transactionBuffer.getMaxReadPosition();
           if (getLastPosition() == tnxMaxReadPosition) {
               return topic.getLastCanDispatchPosition();
           } else {
               return CompletableFuture.completedFuture(tnxMaxReadPosition);
           }
   ```
   
   I still want it to be cohesive in class `TransactionBuffer`.
   
   I think makes sense to expose the `getLastCanDispatchPosition` method in `TransactionBuffer` and `Topic`, it can clearly remind the caller:  **The position returned by the `getLastPosition` method is not dispatchable, should use `getLastCanDispatchPosition`**
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui merged PR #22572:
URL: https://github.com/apache/pulsar/pull/22572


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582004224


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -167,6 +168,82 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
                 "messages don't match.");
     }
 
+    /**
+     * Tests replicated subscriptions across two regions and can read successful.
+     */
+    @Test
+    public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
+        String topicName = "persistent://" + namespace + "/mytopic";
+        String subscriptionName = "cluster-subscription";
+        // this setting can be used to manually run the test with subscription replication disabled
+        // it shows that subscription replication has no impact in behavior for this test case
+        boolean replicateSubscriptionState = true;
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r1
+        createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r2
+        createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
+
+        Set<String> sentMessages = new LinkedHashSet<>();
+
+        // send messages in r1
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        int numMessages = 6;
+        for (int i = 0; i < numMessages; i++) {
+            String body = "message" + i;
+            producer.send(body.getBytes(StandardCharsets.UTF_8));
+            sentMessages.add(body);
+        }
+        producer.close();
+
+
+        // consume 3 messages in r1
+        Set<String> receivedMessages = new LinkedHashSet<>();
+        try (Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()) {
+            readMessages(consumer1, receivedMessages, 3, false);
+        }
+
+        // wait for subscription to be replicated
+        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

Review Comment:
   Is it possible to avoid the Thread.sleep by employ the Awaitability?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java:
##########
@@ -275,6 +275,11 @@ CompletableFuture<? extends TopicStatsImpl> asyncGetStats(boolean getPreciseBack
 
     Position getLastPosition();
 
+    /**
+     * Get the last message position that can be dispatch.
+     */
+    CompletableFuture<Position> getLastDispatchablePosition();

Review Comment:
   It's also better to have a default implementation.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java:
##########
@@ -648,6 +648,11 @@ default void skipNonRecoverableLedger(long ledgerId){}
      * */
     CompletableFuture<Position> asyncFindPosition(Predicate<Entry> predicate);
 
+    /**
+     * Reverse find position one-entry by one-entry.
+     */
+    CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate);

Review Comment:
   The ManageLedger has been marked as `@InterfaceStability.Stable`. So we need to have a default implementation for the newly added method.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,16 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        PositionImpl tnxMaxReadPosition = getMaxReadPosition();
+        if (tnxMaxReadPosition.compareTo((PositionImpl) topic.getLastPosition()) == 0) {
+            return topic.getLastDispatchablePosition();
+        } else {
+            return CompletableFuture.completedFuture(tnxMaxReadPosition);
+        }
+    }

Review Comment:
   If we want to introduce this method in the TransactionBuffer. I think we'd better make sure the behavior is correct. The aborted transactions is not dispatchable, so we'd filter them out.
   
   Or we can just revert this change and open another PR to handle the transaction issues which will be more better approach because the PR title said it fixes the issue related to replicate subscription state.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   We already have `findNewestMatching` method in the ManageCursor API. We can just use the ReadonlyCursor for find the the available position. So that we don't need to make any changes to the ManagedLedger API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582020334


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -167,6 +168,82 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
                 "messages don't match.");
     }
 
+    /**
+     * Tests replicated subscriptions across two regions and can read successful.
+     */
+    @Test
+    public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
+        String topicName = "persistent://" + namespace + "/mytopic";
+        String subscriptionName = "cluster-subscription";
+        // this setting can be used to manually run the test with subscription replication disabled
+        // it shows that subscription replication has no impact in behavior for this test case
+        boolean replicateSubscriptionState = true;
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r1
+        createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r2
+        createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
+
+        Set<String> sentMessages = new LinkedHashSet<>();
+
+        // send messages in r1
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        int numMessages = 6;
+        for (int i = 0; i < numMessages; i++) {
+            String body = "message" + i;
+            producer.send(body.getBytes(StandardCharsets.UTF_8));
+            sentMessages.add(body);
+        }
+        producer.close();
+
+
+        // consume 3 messages in r1
+        Set<String> receivedMessages = new LinkedHashSet<>();
+        try (Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()) {
+            readMessages(consumer1, receivedMessages, 3, false);
+        }
+
+        // wait for subscription to be replicated
+        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());
+
+        // create a reader in r2
+        Reader<byte[]> reader = client2.newReader().topic(topicName)
+                .subscriptionName("new-sub")
+                .startMessageId(MessageId.earliest)
+                .create();
+        int readNum = 0;
+        while (reader.hasMessageAvailable()) {
+            Message<byte[]> message = reader.readNext(10, TimeUnit.SECONDS);
+            log.info("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId());
+            assertNotNull(message);

Review Comment:
   ```suggestion
               assertNotNull(message);
               log.info("Receive message: " + new String(message.getValue()) + " msgId: " + message.getMessageId());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1579492679


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {

Review Comment:
   Thanks for sharing your idea. I still like to let `TranactionBuffer` has complete logic about `getLastCanDispatchPosition`, like this PR:
   ```java
   class TopicTransactionBuffer {
       @Override
       public CompletableFuture<Position> getLastDispatchablePosition() {
           PositionImpl tnxMaxReadPosition = getMaxReadPosition();
           if (tnxMaxReadPosition.compareTo((PositionImpl) topic.getLastPosition()) == 0) {
               return topic.getLastDispatchablePosition();
           } else {
               return CompletableFuture.completedFuture(tnxMaxReadPosition);
           }
       }
   }
   ```
   
   Let's hear what others think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578390902


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {
+        return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   I mean if we want to fix #22571 by adding `getLastCanDispatchPosition` here, we also need to consider the `maxReadPosition` equals to `lastConfirmedEntry` and it could be a Marker.
   
   Before 
   ```java
   return CompletableFuture.completedFuture(getMaxReadPosition());
   ```
   verify the `maxReadPosition` is valid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578835194


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4082,6 +4092,10 @@ public PositionImpl getMaxReadPosition() {
         return this.transactionBuffer.getMaxReadPosition();
     }
 
+    public CompletableFuture<Position> getLastCanDispatchPositionWithTxn() {

Review Comment:
   Make sense to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "lhotari (via GitHub)" <gi...@apache.org>.
lhotari commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577628530


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4321,4 +4321,44 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
             assertEquals(ml.currentLedgerEntries, 0);
         });
     }
+
+    @Test
+    public void testReverseFindPositionOneByOne() throws Exception {
+        final int maxEntriesPerLedger = 5;
+
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+        ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig);
+        
+        String matchEntry = "match-entry";
+        String noMatchEntry = "nomatch-entry";
+        Predicate<Entry> predicate = entry -> {
+            try {
+                String entryValue = entry.getDataBuffer().toString(UTF_8);
+                if (matchEntry.equals(entryValue)) {
+                    return true;
+                }
+            } finally {
+                entry.release();
+            }

Review Comment:
   it's better to move releasing to internalAsyncReverseFindPositionOneByOne ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1577951228


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,11 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {
+        return CompletableFuture.completedFuture(getMaxReadPosition());

Review Comment:
   I agree with your point, I'm not familiar with transaction implementation yet, and I'm not sure if there's anything special to deal with. 
   
   This PR will keep the logic the same as before when enabling transactions. and I think I can write a test in the next PR to verify your guess and fix the transaction buffer issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809479


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -160,6 +160,12 @@ public interface TransactionBuffer {
      */
     PositionImpl getMaxReadPosition();
 
+    /**
+     * Get the can dispatch max position.
+     * @return the stable position.
+     */
+    CompletableFuture<Position> getLastCanDispatchPosition();

Review Comment:
   Maybe we don't need this method, and move `TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to `PersistentTopic#getLastCanDispatchPositionWithTxn()` is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582011285


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3625,6 +3626,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        return ledger.asyncReverseFindPositionOneByOne((entry -> {
+            MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+            // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+            return !Markers.isServerOnlyMarker(md);
+        }));
+    }

Review Comment:
   It is better to return a smaller value with `getMaxReadPosition()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582018517


##########
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java:
##########
@@ -167,6 +168,82 @@ public void testReplicatedSubscriptionAcrossTwoRegions() throws Exception {
                 "messages don't match.");
     }
 
+    /**
+     * Tests replicated subscriptions across two regions and can read successful.
+     */
+    @Test
+    public void testReplicatedSubscriptionAcrossTwoRegionsGetLastMessage() throws Exception {
+        String namespace = BrokerTestUtil.newUniqueName("pulsar/replicatedsubscriptionlastmessage");
+        String topicName = "persistent://" + namespace + "/mytopic";
+        String subscriptionName = "cluster-subscription";
+        // this setting can be used to manually run the test with subscription replication disabled
+        // it shows that subscription replication has no impact in behavior for this test case
+        boolean replicateSubscriptionState = true;
+
+        admin1.namespaces().createNamespace(namespace);
+        admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
+
+        @Cleanup
+        PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r1
+        createReplicatedSubscription(client1, topicName, subscriptionName, replicateSubscriptionState);
+
+        @Cleanup
+        PulsarClient client2 = PulsarClient.builder().serviceUrl(url2.toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .build();
+
+        // create subscription in r2
+        createReplicatedSubscription(client2, topicName, subscriptionName, replicateSubscriptionState);
+
+        Set<String> sentMessages = new LinkedHashSet<>();
+
+        // send messages in r1
+        @Cleanup
+        Producer<byte[]> producer = client1.newProducer().topic(topicName)
+                .enableBatching(false)
+                .messageRoutingMode(MessageRoutingMode.SinglePartition)
+                .create();
+        int numMessages = 6;
+        for (int i = 0; i < numMessages; i++) {
+            String body = "message" + i;
+            producer.send(body.getBytes(StandardCharsets.UTF_8));
+            sentMessages.add(body);
+        }
+        producer.close();
+
+
+        // consume 3 messages in r1
+        Set<String> receivedMessages = new LinkedHashSet<>();
+        try (Consumer<byte[]> consumer1 = client1.newConsumer()
+                .topic(topicName)
+                .subscriptionName(subscriptionName)
+                .replicateSubscriptionState(replicateSubscriptionState)
+                .subscribe()) {
+            readMessages(consumer1, receivedMessages, 3, false);
+        }
+
+        // wait for subscription to be replicated
+        Thread.sleep(2 * config1.getReplicatedSubscriptionsSnapshotFrequencyMillis());

Review Comment:
   I didn't find a valid condition to determine that the subscription status is synced. 
   
   This test class uses the same sleep code in a lot of places, it looks like it's stable, maybe we can keep using it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582018608


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   It's just an example to show the implementation could vary. Not a suggestion to the actual implementation. I meant
   1. `read(activeLedger)` returns a list of entries (`entries`)
   2. `for (int i = entries.size() - 1; i >= 0; i--)` reverse find
   
   not
   
   ```
   while (true) {
     var position = getPreviousPosition(position);
     var entry = readOneEntry(position);
     /* ... */
   }
   ```
   
   The code above is pseudo code. When I mentioned the implementation, I didn't look into the details of the cache. So please don't take serious on my example.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "coderzc (via GitHub)" <gi...@apache.org>.
coderzc commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582033001


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3634,6 +3636,22 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        PositionImpl maxReadPosition = getMaxReadPosition();
+        // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions.
+        // so return `maxRedPosition` directly.
+        if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
+            return CompletableFuture.completedFuture(maxReadPosition);
+        } else {
+            return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
+                MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+                // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+                return !Markers.isServerOnlyMarker(md);
+            });
+        }
+    }

Review Comment:
   Not sure if there is a race condition here, if produce uncommitted transactions before searching a valid position, then we can return an uncommitted position.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582045658


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3625,6 +3626,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        return ledger.asyncReverseFindPositionOneByOne((entry -> {
+            MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+            // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+            return !Markers.isServerOnlyMarker(md);
+        }));
+    }

Review Comment:
   > Oh, sorry, I mean if the result of asyncReverseFindPositionOneByOne is greater than getMaxReadPosition, then the result of getMaxReadPosition should be returned.
   
   I  think the result of `asyncReverseFindPositionOneByOne` is never greater than `getMaxReadPosition()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809479


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -160,6 +160,12 @@ public interface TransactionBuffer {
      */
     PositionImpl getMaxReadPosition();
 
+    /**
+     * Get the can dispatch max position.
+     * @return the stable position.
+     */
+    CompletableFuture<Position> getLastCanDispatchPosition();

Review Comment:
   Maybe we don't need this method, and move `TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to `PersistentTopic#getLastCanDispatchPositionWithTxn()` is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582050000


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3634,6 +3636,22 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        PositionImpl maxReadPosition = getMaxReadPosition();
+        // If `maxReadPosition` is not equal to `LastPosition`. It means that there are uncommitted transactions.
+        // so return `maxRedPosition` directly.
+        if (maxReadPosition.compareTo((PositionImpl) getLastPosition()) != 0) {
+            return CompletableFuture.completedFuture(maxReadPosition);
+        } else {
+            return ManagedLedgerImplUtils.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
+                MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+                // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+                return !Markers.isServerOnlyMarker(md);
+            });
+        }
+    }

Review Comment:
   +1, we need to pass `maxReadPosition` to `asyncGetLastValidPosition`, thanks for your reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1585820223


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/ManagedLedgerImplUtils.java:
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
+import org.apache.bookkeeper.mledger.AsyncCallbacks;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+@InterfaceStability.Evolving
+public class ManagedLedgerImplUtils {
+
+    /**
+     * Reverse find last valid position one-entry by one-entry.
+     */
+    public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
+                                                                        final Predicate<Entry> predicate,
+                                                                        final PositionImpl startPosition) {
+        CompletableFuture<Position> future = new CompletableFuture<>();
+        if (!ledger.isValidPosition(startPosition)) {
+            future.complete(startPosition);
+        } else {
+            internalAsyncReverseFindPositionOneByOne(ledger, predicate, startPosition, future);
+        }
+        return future;
+    }
+
+    private static void internalAsyncReverseFindPositionOneByOne(final ManagedLedgerImpl ledger,
+                                                                 final Predicate<Entry> predicate,
+                                                                 final PositionImpl position,
+                                                                 final CompletableFuture<Position> future) {
+        ledger.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                final Position position = entry.getPosition();
+                try {
+                    if (predicate.test(entry)) {
+                        future.complete(position);
+                        return;
+                    }
+                    PositionImpl previousPosition = ledger.getPreviousPosition((PositionImpl) position);
+                    if (!ledger.isValidPosition(previousPosition)) {
+                        future.complete(previousPosition);
+                    } else {
+                        internalAsyncReverseFindPositionOneByOne(ledger, predicate,
+                                ledger.getPreviousPosition((PositionImpl) position), future);

Review Comment:
   fixed by https://github.com/apache/pulsar/pull/22610



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578776447


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java:
##########
@@ -4321,4 +4321,44 @@ public void testDeleteCurrentLedgerWhenItIsClosed(boolean closeLedgerByAddEntry)
             assertEquals(ml.currentLedgerEntries, 0);
         });
     }
+
+    @Test
+    public void testReverseFindPositionOneByOne() throws Exception {
+        final int maxEntriesPerLedger = 5;
+
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(maxEntriesPerLedger);
+        ManagedLedger ledger = factory.open("testReverseFindPositionOneByOne", managedLedgerConfig);
+        
+        String matchEntry = "match-entry";
+        String noMatchEntry = "nomatch-entry";
+        Predicate<Entry> predicate = entry -> {
+            try {
+                String entryValue = entry.getDataBuffer().toString(UTF_8);
+                if (matchEntry.equals(entryValue)) {
+                    return true;
+                }
+            } finally {
+                entry.release();
+            }

Review Comment:
   Make sense, moved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578809582


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   Maybe `asyncFindLastValidPosition` or something else is better



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionBuffer.java:
##########
@@ -160,6 +160,12 @@ public interface TransactionBuffer {
      */
     PositionImpl getMaxReadPosition();
 
+    /**
+     * Get the can dispatch max position.
+     * @return the stable position.
+     */
+    CompletableFuture<Position> getLastCanDispatchPosition();

Review Comment:
   Maybe we don't need this method in TransactionBuffer, and move `TopicTransactionBuffer#getLastCanDispatchPosition`'s logic to `PersistentTopic#getLastCanDispatchPositionWithTxn()` is better.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {

Review Comment:
   Maybe we can make the method private or move the logics to `getLastCanDispatchPositionWithTxn()`, just expose 
   `getLastCanDispatchPositionWithTxn()`.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4082,6 +4092,10 @@ public PositionImpl getMaxReadPosition() {
         return this.transactionBuffer.getMaxReadPosition();
     }
 
+    public CompletableFuture<Position> getLastCanDispatchPositionWithTxn() {

Review Comment:
   Maybe we can rename the method to `getLastDispatchablePositionWithTxn` or something else.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578834981


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {

Review Comment:
   About the simple method comment, I know your intention.
   
   After these change, there will be fewer methods. But this logic will runs into the `PersistentTopic`.
   ```java
           PositionImpl tnxMaxReadPosition = transactionBuffer.getMaxReadPosition();
           if (getLastPosition() == tnxMaxReadPosition) {
               return topic.getLastCanDispatchPosition();
           } else {
               return CompletableFuture.completedFuture(tnxMaxReadPosition);
           }
   ```
   
   I still want it to be cohesive in class `TransactionBuffer`.
   
   I think makes sense to expose the `getLastCanDispatchPosition` method in `TransactionBuffer` and `Topic`, it can clearly remind the caller:  **The position returned by the `getLastPosition` method is not dispatchable, should use `getLastCanDispatchPosition`**
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582017410


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3625,6 +3626,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastDispatchablePosition() {
+        return ledger.asyncReverseFindPositionOneByOne((entry -> {
+            MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
+            // If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
+            return !Markers.isServerOnlyMarker(md);
+        }));
+    }

Review Comment:
   Why, since IO may be involved here, it is more appropriate to return to the future



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "RobertIndie (via GitHub)" <gi...@apache.org>.
RobertIndie commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582002409


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   +1 for `asyncFindLastValidPosition`. We are defining the interface method instead of the implementation. `asyncReverseFindPositionOneByOne` is to reflect the implementation details while `asyncFindLastValidPosition` is more suitable for the interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#issuecomment-2081449814

   ## [Codecov](https://app.codecov.io/gh/apache/pulsar/pull/22572?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   Attention: Patch coverage is `80.00000%` with `9 lines` in your changes are missing coverage. Please review.
   > Project coverage is 74.13%. Comparing base [(`bbc6224`)](https://app.codecov.io/gh/apache/pulsar/commit/bbc62245c5ddba1de4b1e7cee4ab49334bc36277?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`f094e04`)](https://app.codecov.io/gh/apache/pulsar/pull/22572?dropdown=coverage&src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache).
   > Report is 203 commits behind head on master.
   
   <details><summary>Additional details and impacted files</summary>
   
   
   [![Impacted file tree graph](https://app.codecov.io/gh/apache/pulsar/pull/22572/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)](https://app.codecov.io/gh/apache/pulsar/pull/22572?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #22572      +/-   ##
   ============================================
   + Coverage     73.57%   74.13%   +0.56%     
   + Complexity    32624     2747   -29877     
   ============================================
     Files          1877     1886       +9     
     Lines        139502   140653    +1151     
     Branches      15299    15462     +163     
   ============================================
   + Hits         102638   104278    +1640     
   + Misses        28908    28331     -577     
   - Partials       7956     8044      +88     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pulsar/pull/22572/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [inttests](https://app.codecov.io/gh/apache/pulsar/pull/22572/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.23% <57.77%> (+2.64%)` | :arrow_up: |
   | [systests](https://app.codecov.io/gh/apache/pulsar/pull/22572/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `24.48% <57.77%> (+0.15%)` | :arrow_up: |
   | [unittests](https://app.codecov.io/gh/apache/pulsar/pull/22572/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `73.44% <80.00%> (+0.59%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pulsar/pull/22572?dropdown=coverage&src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...sar/broker/service/persistent/PersistentTopic.java](https://app.codecov.io/gh/apache/pulsar/pull/22572?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2Fpersistent%2FPersistentTopic.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL3BlcnNpc3RlbnQvUGVyc2lzdGVudFRvcGljLmphdmE=) | `78.66% <100.00%> (+0.20%)` | :arrow_up: |
   | [...va/org/apache/pulsar/broker/service/ServerCnx.java](https://app.codecov.io/gh/apache/pulsar/pull/22572?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FServerCnx.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1NlcnZlckNueC5qYXZh) | `72.47% <93.33%> (+0.33%)` | :arrow_up: |
   | [...n/java/org/apache/pulsar/broker/service/Topic.java](https://app.codecov.io/gh/apache/pulsar/pull/22572?src=pr&el=tree&filepath=pulsar-broker%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fpulsar%2Fbroker%2Fservice%2FTopic.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9zZXJ2aWNlL1RvcGljLmphdmE=) | `34.78% <0.00%> (-1.59%)` | :arrow_down: |
   | [...ookkeeper/mledger/util/ManagedLedgerImplUtils.java](https://app.codecov.io/gh/apache/pulsar/pull/22572?src=pr&el=tree&filepath=managed-ledger%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fbookkeeper%2Fmledger%2Futil%2FManagedLedgerImplUtils.java&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-bWFuYWdlZC1sZWRnZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2Jvb2trZWVwZXIvbWxlZGdlci91dGlsL01hbmFnZWRMZWRnZXJJbXBsVXRpbHMuamF2YQ==) | `69.56% <69.56%> (ø)` | |
   
   ... and [278 files with indirect coverage changes](https://app.codecov.io/gh/apache/pulsar/pull/22572/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582014350


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   > Is there any harm to adding an interface?
   
   See https://github.com/apache/pulsar/pull/22572#discussion_r1582003060
   
   ```java
     /**
      * Can evolve while retaining compatibility for minor release boundaries.;
      * can break compatibility only at major release (ie. at m.0).
      */
   ```
   
   So you have to use a default method or a static method.
   
   > What do you think call asyncReverseFindLastValidPosition? I think Reverse should be part of the method name.
   
   `Reverse` is still redundant. You can take the Java standard API for example:
   
   ```java
           String s = "hello";
           s.lastIndexOf('l');
   ```
   
   It's `lastIndexOf`, no something like `reverse`. Because `Last` is already a hint for reverse finding. Python string API has `index` and `rindex`, where `r` is the hint for `reverse`. But it does not have any other hint like `last`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578822283


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TopicTransactionBuffer.java:
##########
@@ -509,6 +509,16 @@ public PositionImpl getMaxReadPosition() {
         }
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {
+        PositionImpl tnxMaxReadPosition = getMaxReadPosition();
+        if (topic.getLastPosition() == tnxMaxReadPosition) {

Review Comment:
   I know `tnxMaxReadPosition` and `topic.getLastPosition()` are the same object if they are equals to, but for the compare about two `PositionImpl` objects, `topic.getLastPosition.compareTo(tnxMaxReadPosition) == 0` is better



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "codelipenghui (via GitHub)" <gi...@apache.org>.
codelipenghui commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582017648


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   > @codelipenghui Here need Reverse find, and this method doesn't seem match.
   
   Oh I see. Thanks for the explanation. A new method is good to me.
   
   And I think we can follow @BewareMyPower's suggestion to add a Until class. 
   
   > Regarding implementation, actually we can use sequential find on the active ledger rather than the reverse finding because we already have the cache. We only need reverse finding when all entries in the active ledger are not available.
   
   I didn't get the point. Even if the entries are cached, reverse reading is also more efficient for getting the last valid entry?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582017168


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   Thanks for explaining, I will accept this suggestion.
   
   https://github.com/apache/pulsar/pull/22572#discussion_r1582006268
   
   And, I will keep `reverse ` find, Because in most cases, the last entity can meet the predicate.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "dao-jun (via GitHub)" <gi...@apache.org>.
dao-jun commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1578970618


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -3602,6 +3603,15 @@ public Position getLastPosition() {
         return ledger.getLastConfirmedEntry();
     }
 
+    @Override
+    public CompletableFuture<Position> getLastCanDispatchPosition() {

Review Comment:
   For TransactionBuffer, it just handle the TXN commit/abort/append requests and provide the guarantee of READ_COMMIT.
   Determine the position to dispatch is Topic's duty, TransactionBuffer should not be aware of it.
   See also https://github.com/apache/pulsar/blob/master/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L4122-L4124



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582006268


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   It might not be worth creating another cursor.
   
   I think we can use add a static method because all the methods used are public or package visiable.
   
   ```java
   @InterfaceStability.Evolving
   public class ManagedLedgerImplUtils {
   
       public static CompletableFuture<Position> asyncGetLastValidPosition(final ManagedLedgerImpl ledger,
                                                                           final Predicate<Entry> predicate) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "shibd (via GitHub)" <gi...@apache.org>.
shibd commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582012492


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   > We already have findNewestMatching method in the ManageCursor API. We can just use the ReadonlyCursor for find the the available position. So that we don't need to make any changes to the ManagedLedger API.
   
   @codelipenghui Here need `Reverse` find, and this method doesn't seem match.
   
   @BewareMyPower @RobertIndie @tjiuming What do you think call `asyncReverseFindLastValidPosition`?  I think `Reverse` should be part of the method name.
   
   
   > I think we can use add a static method because all the methods used are public or package visible.
   
   Is there any harm to adding an interface? I'd prefer to embody it on the interface, which shows the various use cases of Pulsar for the `ManangerLedger`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] [fix][broker] Reader stuck after call hasMessageAvailable when enable replicateSubscriptionState [pulsar]

Posted by "BewareMyPower (via GitHub)" <gi...@apache.org>.
BewareMyPower commented on code in PR #22572:
URL: https://github.com/apache/pulsar/pull/22572#discussion_r1582015087


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -1852,6 +1852,51 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
         return future;
     }
 
+    @Override
+    public CompletableFuture<Position> asyncReverseFindPositionOneByOne(Predicate<Entry> predicate) {

Review Comment:
   Regarding implementation, actually we can use sequential find on the active ledger rather than the reverse finding because we already have the cache. We only need reverse finding when all entries in the active ledger are not available.
   
   So "Reverse" is not required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org