You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/06/01 06:17:01 UTC

[GitHub] [pulsar] massakam opened a new pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

massakam opened a new pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762


   ### Motivation
   
   Messages with the same key can be out of order if message redelivery occurs on a Key_Shared subscription.
   
   1. Suppose `PersistentDispatcherMultipleConsumers#messagesToRedeliver` contains message-1 and message-2. Message-1 will be delivered to consumer-a and message-2 will be delivered to consumer-b.
   2. The dispatcher tried to send message-1 to consumer-a, but the consumer was too slow to send it.
   3. Consumer-a is added to `stuckConsumers`.
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L263-L266
   4. The next time `readMoreEntries()` is run, `getMessagesToReplayNow()` will return an empty Set because `isDispatcherStuckOnReplays` is true.
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L368-L374
   5. The dispatcher reads newer messages instead of the messages contained in `messagesToRedeliver`.
   https://github.com/apache/pulsar/blob/894d92b2be3bee334e7ce32760c4d2e7978603aa/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L233-L267
   6. A new message (message-3) is delivered to consumer-b.
   7. Message-2 contained in messagesToRedeliver is delivered to consumer-b.
   8. As a result, the order of message-2 and message-3 is reversed.
   
   ### Modifications
   
   When adding a message to be redeliver to `messagesToRedeliver`, save the hash of the key that the message has. If the dispatcher attempts to send newer messages to the consumer that have a key corresponding to any one of the saved hash values, they will be added to `messagesToRedeliver` instead of being sent. This prevents messages with the same key from being out of order.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r666326040



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       @lhotari It happens in the master branch. `hashesToBeBlocked` is an instance of `org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap`, so it should be unrelated to those fixes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r665837821



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {
+                long ledgerId = longPair.first;
+                long entryId = longPair.second;
+                if (ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+                        .result() <= 0) {
+                    hashesToBeBlocked.remove(ledgerId, entryId);
+                }
+            }
+        }
+        return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
+            return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+                    .result() <= 0;
+        });
+    }
+
+    public boolean isEmpty() {
+        return messagesToRedeliver.isEmpty();
+    }
+
+    public void clear() {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.clear();
+        }
+        messagesToRedeliver.clear();
+    }
+
+    public String toString() {
+        return messagesToRedeliver.toString();
+    }
+
+    public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.values()) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r663891021



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       @eolivelli yes, that's true. I think it's better that the PR author solves the issue in the way that makes sense. In my review comment I'm just bringing it to attention that auto-boxing long -> Long will create a java.lang.Long object instance. That's something that isn't desired in the Pulsar code base for hot code paths since there's a goal of minimizing allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r669298778



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       Add the keys to be removed in the `forEach()` loop to the newly created list and remove them from `hashesToBeBlocked` after exiting the loop. This makes the entries to be copied part of the set rather than the whole set. PTAL.
   https://github.com/apache/pulsar/pull/10762/commits/266ca5deafbaba61cbed31736759a1c7d9b0413c




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#issuecomment-877899913


   @merlimat Please help review this PR again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r665995724



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       @massakam Does the deadlock happen in 2.8.x / master branch? There are changes in #9787 that resolved some dead lock issues. (see #10691 for more details)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r651649787



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -142,6 +142,14 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, Set<Integer>>> localConsumerStickyKeyHashesMap =

Review comment:
       why do we need this ThreadLocal ?
   here we are recycling a simple HashMap
   
   we are "saving" a "new HashMap" and in order to do it we are adding more code and memory footprint.
   The JVM should be able to handle this HashMap efficiently, probably within the TLAB 
   
   do you have evidence that this threadlocal helps ?
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r660842127



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -142,6 +142,14 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, Set<Integer>>> localConsumerStickyKeyHashesMap =

Review comment:
       Fixed not to use FastThreadLocal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r663891021



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       @eolivelli yes, that's true. I think it's better that the PR author solves the issue in the way that makes sense than proposing an exact change to do. In my review comment I'm just bringing it to attention that auto-boxing long -> Long will create a java.lang.Long object instance. That's something that isn't desired in the Pulsar code base for hot code paths since there's a goal of minimizing allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r642824270



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -697,20 +698,22 @@ public boolean isConsumerAvailable(Consumer consumer) {
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer) {
-        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, none) -> {
-            addMessageToReplay(ledgerId, entryId);
+        consumer.getPendingAcks().forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> {
+            addMessageToReplay(ledgerId, entryId, stickyKeyHash);
         });
         if (log.isDebugEnabled()) {
             log.debug("[{}-{}] Redelivering unacknowledged messages for consumer {}", name, consumer,
-                    messagesToRedeliver);
+                    redeliveryMessages);
         }
         readMoreEntries();
     }
 
     @Override
     public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
         positions.forEach(position -> {
-            if (addMessageToReplay(position.getLedgerId(), position.getEntryId())) {
+            // TODO: We want to pass a sticky key hash as a third argument to guarantee the order of the messages
+            // on Key_Shared subscription, but it's difficult to get the sticky key here
+            if (addMessageToReplay(position.getLedgerId(), position.getEntryId(), null)) {

Review comment:
       This method is executed when redelivery is requested from the consumer side (e.g. negative ack, ack timeout). So the user should allow the messages to be out of order in this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r668886096



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       Ok, we can go on with this but we should also import here a fixed version of `ConcurrentLongLongPairHashMap` that doesn't have the problem




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r665995724



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       @massakam Does the deadlock happen in 2.8.x / master branch? There are changes in #9787 that resolved some dead lock issues.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r663885812



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       Would it be possible to omit the usage of java.lang.Long object ? the boxing from long -> Long will create object instances and doesn't fit well in the goal of minimizing allocations in the Pulsar code base.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       @eolivelli yes, that's true. I think it's better that the PR author solves the issue in the way that makes sense than proposing an exact change to do. In my review comment I'm just bringing it to attention that auto-boxing long -> Long will create a java.lang.Long object instance. That's something that isn't desired in the Pulsar code base for hot code paths since there's a goal of minimizing allocations.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       @eolivelli yes, that's true. I think it's better that the PR author solves the issue in the way that makes sense. In my review comment I'm just bringing it to attention that auto-boxing long -> Long will create a java.lang.Long object instance. That's something that isn't desired in the Pulsar code base for hot code paths since there's a goal of minimizing allocations.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
lhotari commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r663885812



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       Would it be possible to omit the usage of java.lang.Long object ? the boxing from long -> Long will create object instances and doesn't fit well in the goal of minimizing allocations in the Pulsar code base.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r669298620



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       > but we should also import here a fixed version of `ConcurrentLongLongPairHashMap` that doesn't have the problem
   
   @merlimat Such a version doesn't exist yet, does it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r664245115



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r657585484



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -142,6 +142,14 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, Set<Integer>>> localConsumerStickyKeyHashesMap =

Review comment:
       @merlimat ping




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r665451513



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       Using `key()` will make a copy of the whole set, so we should instead do the `forEach()`.

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {
+                long ledgerId = longPair.first;
+                long entryId = longPair.second;
+                if (ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+                        .result() <= 0) {
+                    hashesToBeBlocked.remove(ledgerId, entryId);
+                }
+            }
+        }
+        return messagesToRedeliver.removeIf((ledgerId, entryId) -> {
+            return ComparisonChain.start().compare(ledgerId, markDeleteLedgerId).compare(entryId, markDeleteEntryId)
+                    .result() <= 0;
+        });
+    }
+
+    public boolean isEmpty() {
+        return messagesToRedeliver.isEmpty();
+    }
+
+    public void clear() {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.clear();
+        }
+        messagesToRedeliver.clear();
+    }
+
+    public String toString() {
+        return messagesToRedeliver.toString();
+    }
+
+    public boolean containsStickyKeyHashes(Set<Integer> stickyKeyHashes) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.values()) {

Review comment:
       Same here, `value()` will make a copy of the set.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r663886487



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       @lhotari do you have a proposal ? we cannot use a sentinel value, because every "long" value is allowed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r652294994



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
##########
@@ -142,6 +142,14 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, Set<Integer>>> localConsumerStickyKeyHashesMap =

Review comment:
       No, I don't have clear evidence. I just implemented it the same as the existing `localGroupedEntries`.
   https://github.com/apache/pulsar/blob/40c831761a7519e4ce9f0b7b8808cdb6df3f418e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java#L143-L149
   
   `localGroupedEntries` seems to have been added in the following PR:
   https://github.com/apache/pulsar/pull/7104
   
   @merlimat, do we need to use ThreadLocal in this case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#issuecomment-874471196


   @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r663886487



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       @lhotari do you have a proposal ? we cannot use a sentinel value, because every "long" value is allowed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r664245115



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
##########
@@ -862,11 +864,15 @@ public void cursorIsReset() {
         }
     }
 
-    protected boolean addMessageToReplay(long ledgerId, long entryId) {
+    protected boolean addMessageToReplay(long ledgerId, long entryId, Long stickyKeyHash) {

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r669298778



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       Add the keys to be removed in the `forEach()` loop to the newly created list and remove them from `hashesToBeBlocked` after exiting the loop. This makes the entries to be copied part of the set rather than the whole set. PTAL.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on a change in pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on a change in pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#discussion_r665838832



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageRedeliveryController.java
##########
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import com.google.common.collect.ComparisonChain;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
+import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
+import org.apache.pulsar.common.util.collections.ConcurrentSortedLongPairSet;
+import org.apache.pulsar.common.util.collections.LongPairSet;
+
+public class MessageRedeliveryController {
+    private final LongPairSet messagesToRedeliver;
+    private final ConcurrentLongLongPairHashMap hashesToBeBlocked;
+
+    public MessageRedeliveryController(boolean allowOutOfOrderDelivery) {
+        this.messagesToRedeliver = new ConcurrentSortedLongPairSet(128, 2);
+        this.hashesToBeBlocked = allowOutOfOrderDelivery ? null : new ConcurrentLongLongPairHashMap(128, 2);
+    }
+
+    public boolean add(long ledgerId, long entryId) {
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean add(long ledgerId, long entryId, long stickyKeyHash) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.put(ledgerId, entryId, stickyKeyHash, 0);
+        }
+        return messagesToRedeliver.add(ledgerId, entryId);
+    }
+
+    public boolean remove(long ledgerId, long entryId) {
+        if (hashesToBeBlocked != null) {
+            hashesToBeBlocked.remove(ledgerId, entryId);
+        }
+        return messagesToRedeliver.remove(ledgerId, entryId);
+    }
+
+    public int removeAllUpTo(long markDeleteLedgerId, long markDeleteEntryId) {
+        if (hashesToBeBlocked != null) {
+            for (LongPair longPair : hashesToBeBlocked.keys()) {

Review comment:
       @merlimat If we try to remove any entry in `forEach()`, it seems to cause a deadlock. Is there any way to avoid this without using `keys()`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] massakam commented on pull request #10762: [broker] Fix issue that message ordering could be broken when redelivering messages on Key_Shared subscription

Posted by GitBox <gi...@apache.org>.
massakam commented on pull request #10762:
URL: https://github.com/apache/pulsar/pull/10762#issuecomment-874471196


   @lhotari PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org