You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/30 09:07:33 UTC

[GitHub] [pulsar] eolivelli commented on a diff in pull request #17782: [fix][broker]Can't consume messages for a long time due to Entry Filter

eolivelli commented on code in PR #17782:
URL: https://github.com/apache/pulsar/pull/17782#discussion_r984309409


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -700,6 +707,30 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
         return true;
     }
 
+    public Consumer getNextConsumer(List<Entry> entries) {
+        if (!redeliveryTracker.hasRedeliveredEntry(entries)){
+            return super.getNextConsumer();
+        }
+        if (redeliveryTracker instanceof InMemoryAndPreventCycleFilterRedeliveryTracker tracker){

Review Comment:
   we should not use "instanceof"
   we should leverage polimorphism



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -700,6 +707,30 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
         return true;
     }
 
+    public Consumer getNextConsumer(List<Entry> entries) {
+        if (!redeliveryTracker.hasRedeliveredEntry(entries)){
+            return super.getNextConsumer();
+        }
+        if (redeliveryTracker instanceof InMemoryAndPreventCycleFilterRedeliveryTracker tracker){
+            if (tracker.pausedConsumerCount() == consumerSet.size()){
+                log.warn("No consumers are currently able to consume the first redelivery entry {}",
+                        tracker.getRedeliveryStartAt());
+                return super.getNextConsumer();
+            } else {
+                Consumer nextConsumer = null;
+                while (true){

Review Comment:
   this looks like a endless loop in some cases.
   we cannot keep this thread busy forever



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -700,6 +707,30 @@ protected synchronized boolean trySendMessagesToConsumers(ReadType readType, Lis
         return true;
     }
 
+    public Consumer getNextConsumer(List<Entry> entries) {
+        if (!redeliveryTracker.hasRedeliveredEntry(entries)){
+            return super.getNextConsumer();
+        }
+        if (redeliveryTracker instanceof InMemoryAndPreventCycleFilterRedeliveryTracker tracker){
+            if (tracker.pausedConsumerCount() == consumerSet.size()){
+                log.warn("No consumers are currently able to consume the first redelivery entry {}",

Review Comment:
   I think that this will be logged may times, and actually it is not a problem.
   
   I am testing with some heavy load, in order to tell more



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryAndPreventCycleFilterRedeliveryTracker.java:
##########
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Position;
+
+/***
+ * When there are two consumers, users can specify the consumption behavior of each consumer by `Entry filter`:
+ * - `case-1`: `consumer_1` can consume 60% of the messages, `consumer_2` can consume 60% of the messages, and there
+ * is 10% intersection between `consumer_1` and `consumer_2`.
+ * - `case-2`: `consumer_1` can consume 40% of the messages, `consumer_2` can consume 40% of the messages, and no
+ * consumer can consume the remaining 20%.
+ *
+ * In case-1, when users use `FilterResult.RESCHEDULE `, and if the message that can only be consumed by `consumer_1`
+ * is delivered to `consumer_2` all the time, and the message that can only be consumed by `consumer_2` is delivered
+ * to `consumer_1` all the time, then the problem occurs:
+ * - Both consumers can not receive messages anymore.
+ * - The number of redeliveries of entries has been increasing ( redelivery by Entry Filter ).
+ *
+ * So {@link InMemoryAndPreventCycleFilterRedeliveryTracker} solve this problem by this way:
+ * When a message is redelivered by the same consumer more than 3 times, the consumption of this message by that
+ * consumer is paused for 1 second. Since tracking the consumption of all the messages cost memory too much, we trace
+ * only the messages with the smallest position.
+ */
+public class InMemoryAndPreventCycleFilterRedeliveryTracker extends InMemoryRedeliveryTracker {
+
+    /** The first redelivery record. **/
+    @Getter
+    private volatile Position redeliveryStartAt;
+    /**
+     * key: The consumer calls redelivery at {@link #redeliveryStartAt}.
+     * value: The number of times the consumer calls redelivery.
+     */
+    private final ConcurrentHashMap<Consumer, AtomicInteger> earliestEntryRedeliveryCountMapping =
+            new ConcurrentHashMap<>();
+    /**
+     *  key: paused consumer.
+     *  value: pause because of which position.
+     */
+    private final ConcurrentHashMap<Consumer, PauseConsumerInformation> pausedConsumers = new ConcurrentHashMap<>();

Review Comment:
   when a Consumer is disconnected we must remove it from the map, the Dispatcher can notify it explicitly
   
   and not in only in cleanEarliestInformation



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryAndPreventCycleFilterRedeliveryTracker.java:
##########
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Position;
+
+/***
+ * When there are two consumers, users can specify the consumption behavior of each consumer by `Entry filter`:
+ * - `case-1`: `consumer_1` can consume 60% of the messages, `consumer_2` can consume 60% of the messages, and there
+ * is 10% intersection between `consumer_1` and `consumer_2`.
+ * - `case-2`: `consumer_1` can consume 40% of the messages, `consumer_2` can consume 40% of the messages, and no
+ * consumer can consume the remaining 20%.
+ *
+ * In case-1, when users use `FilterResult.RESCHEDULE `, and if the message that can only be consumed by `consumer_1`
+ * is delivered to `consumer_2` all the time, and the message that can only be consumed by `consumer_2` is delivered
+ * to `consumer_1` all the time, then the problem occurs:
+ * - Both consumers can not receive messages anymore.
+ * - The number of redeliveries of entries has been increasing ( redelivery by Entry Filter ).
+ *
+ * So {@link InMemoryAndPreventCycleFilterRedeliveryTracker} solve this problem by this way:
+ * When a message is redelivered by the same consumer more than 3 times, the consumption of this message by that
+ * consumer is paused for 1 second. Since tracking the consumption of all the messages cost memory too much, we trace
+ * only the messages with the smallest position.
+ */
+public class InMemoryAndPreventCycleFilterRedeliveryTracker extends InMemoryRedeliveryTracker {
+
+    /** The first redelivery record. **/
+    @Getter
+    private volatile Position redeliveryStartAt;
+    /**
+     * key: The consumer calls redelivery at {@link #redeliveryStartAt}.
+     * value: The number of times the consumer calls redelivery.
+     */
+    private final ConcurrentHashMap<Consumer, AtomicInteger> earliestEntryRedeliveryCountMapping =
+            new ConcurrentHashMap<>();
+    /**
+     *  key: paused consumer.
+     *  value: pause because of which position.
+     */
+    private final ConcurrentHashMap<Consumer, PauseConsumerInformation> pausedConsumers = new ConcurrentHashMap<>();
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position, Consumer consumer) {
+        int newCount = super.incrementAndGetRedeliveryCount(position, consumer);
+        // Diff with super implementation: record how many times this consumer calls "redelivery" at earliest position.
+        if (position == null || consumer == null){
+            return newCount;
+        }
+        Position originalEarliestPosition = redeliveryStartAt;
+        Position actualEarliestPosition = null;
+        if (originalEarliestPosition == null) {
+            cleanEarliestInformation();
+            actualEarliestPosition = position;
+        } else {
+            int isEarlier = comparePosition(originalEarliestPosition, position);
+            if (isEarlier < 0) {
+                return newCount;
+            } else if (isEarlier > 0) {
+                cleanEarliestInformation();
+                actualEarliestPosition = position;
+            } else {
+                actualEarliestPosition = originalEarliestPosition;
+            }
+        }
+        redeliveryStartAt = actualEarliestPosition;
+        int redeliveryCount = earliestEntryRedeliveryCountMapping.computeIfAbsent(consumer, c -> new AtomicInteger())
+                .incrementAndGet();
+        if (redeliveryCount >= 3) {
+            pausedConsumers.put(consumer, new PauseConsumerInformation(actualEarliestPosition));
+        } else {
+        }
+        return newCount;
+    }
+
+    @Override
+    public void remove(Position position, Position markDeletedPosition) {
+        super.remove(position, markDeletedPosition);
+        if (redeliveryStartAt == null || comparePosition(redeliveryStartAt, position) == 0
+                || comparePosition(redeliveryStartAt, markDeletedPosition) >= 0) {
+            cleanEarliestInformation();
+        }
+    }
+
+    @Override
+    public void clear() {
+        super.clear();
+        cleanEarliestInformation();
+    }
+
+    public boolean isConsumerPaused(Consumer consumer) {
+        if (consumer == null) {
+            return false;
+        }
+        PauseConsumerInformation pauseConsumerInformation = pausedConsumers.get(consumer);
+        if (pauseConsumerInformation == null) {
+            return false;
+        }
+        if (!pauseConsumerInformation.isValid(redeliveryStartAt)) {
+            pausedConsumers.remove(consumer);
+            return false;
+        }
+        return true;
+    }
+
+    public int pausedConsumerCount() {
+        return (int) pausedConsumers.keySet().stream().map(this::isConsumerPaused).count();
+    }
+
+    private void cleanEarliestInformation() {
+        redeliveryStartAt = null;
+        // If consumer has been closed, remove this consumer.
+        List<Consumer> closedConsumers = earliestEntryRedeliveryCountMapping.keySet().stream()
+                .filter(Consumer::isClosed).toList();
+        closedConsumers.forEach(earliestEntryRedeliveryCountMapping::remove);
+        // Just reset counter to 0, because value will be removed if consumer is closed.
+        earliestEntryRedeliveryCountMapping.values().forEach(i -> i.set(0));
+        pausedConsumers.clear();
+    }
+
+    private static int comparePosition(Position pos1, Position pos2) {
+        int ledgerCompare = Long.compare(pos1.getLedgerId(), pos2.getLedgerId());
+        if (ledgerCompare != 0) {
+            return ledgerCompare;
+        }
+        return Long.compare(pos1.getEntryId(), pos2.getEntryId());
+    }
+
+    private static class PauseConsumerInformation {
+
+        private final Position cantConsumedPosition;
+
+        private final long pauseTime;
+
+        private PauseConsumerInformation(Position cantConsumedPosition) {
+            this.cantConsumedPosition = cantConsumedPosition;
+            this.pauseTime = System.currentTimeMillis();
+        }
+
+        /**
+         * If "do pause consumer" and {@link #cleanEarliestInformation} concurrently, it is possible to pause
+         * consumer that no longer needs to be paused. The way to distinguish is to determine whether
+         * "cantConsumedPosition equal to currentRedeliveryStartAt".
+         * {@link PauseConsumerInformation#isValid(Position)} resolved this problem.
+         */
+        boolean isValid(Position currentRedeliveryStartAt) {
+            if (cantConsumedPosition != currentRedeliveryStartAt) {
+                return false;
+            }
+            // Automatically becomes invalid after 1s, because users may use time to filter the Entry.
+            return System.currentTimeMillis() - pauseTime < 1000;

Review Comment:
   this should be configurable, maybe next to ServiceConfiguration#dispatcherEntryFilterRescheduledMessageDelay ?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/RedeliveryTracker.java:
##########
@@ -19,17 +19,30 @@
 package org.apache.pulsar.broker.service;
 
 import java.util.List;
+import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 
 public interface RedeliveryTracker {
 
-    int incrementAndGetRedeliveryCount(Position position);
+    int incrementAndGetRedeliveryCount(Position position, Consumer consumer);
 
     int getRedeliveryCount(Position position);
 
-    void remove(Position position);
+    void remove(Position position, Position markDeletedPosition);
 
-    void removeBatch(List<Position> positions);
+    void removeBatch(List<Position> positions, Position markDeletedPosition);
 
     void clear();
+
+    default boolean hasRedeliveredEntry(List<Entry> entries) {

Review Comment:
   maybe the default implementation should be no-op, this way we don't add load to users who don't need this feature



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryAndPreventCycleFilterRedeliveryTracker.java:
##########
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Position;
+
+/***
+ * When there are two consumers, users can specify the consumption behavior of each consumer by `Entry filter`:
+ * - `case-1`: `consumer_1` can consume 60% of the messages, `consumer_2` can consume 60% of the messages, and there
+ * is 10% intersection between `consumer_1` and `consumer_2`.
+ * - `case-2`: `consumer_1` can consume 40% of the messages, `consumer_2` can consume 40% of the messages, and no
+ * consumer can consume the remaining 20%.
+ *
+ * In case-1, when users use `FilterResult.RESCHEDULE `, and if the message that can only be consumed by `consumer_1`
+ * is delivered to `consumer_2` all the time, and the message that can only be consumed by `consumer_2` is delivered
+ * to `consumer_1` all the time, then the problem occurs:
+ * - Both consumers can not receive messages anymore.
+ * - The number of redeliveries of entries has been increasing ( redelivery by Entry Filter ).
+ *
+ * So {@link InMemoryAndPreventCycleFilterRedeliveryTracker} solve this problem by this way:
+ * When a message is redelivered by the same consumer more than 3 times, the consumption of this message by that
+ * consumer is paused for 1 second. Since tracking the consumption of all the messages cost memory too much, we trace
+ * only the messages with the smallest position.
+ */
+public class InMemoryAndPreventCycleFilterRedeliveryTracker extends InMemoryRedeliveryTracker {
+
+    /** The first redelivery record. **/
+    @Getter
+    private volatile Position redeliveryStartAt;
+    /**
+     * key: The consumer calls redelivery at {@link #redeliveryStartAt}.
+     * value: The number of times the consumer calls redelivery.
+     */
+    private final ConcurrentHashMap<Consumer, AtomicInteger> earliestEntryRedeliveryCountMapping =

Review Comment:
   when a Consumer is disconnected we must remove it from the map, the Dispatcher can notify it explicitly
   
   and not in only in cleanEarliestInformation



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/InMemoryAndPreventCycleFilterRedeliveryTracker.java:
##########
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Position;
+
+/***
+ * When there are two consumers, users can specify the consumption behavior of each consumer by `Entry filter`:
+ * - `case-1`: `consumer_1` can consume 60% of the messages, `consumer_2` can consume 60% of the messages, and there
+ * is 10% intersection between `consumer_1` and `consumer_2`.
+ * - `case-2`: `consumer_1` can consume 40% of the messages, `consumer_2` can consume 40% of the messages, and no
+ * consumer can consume the remaining 20%.
+ *
+ * In case-1, when users use `FilterResult.RESCHEDULE `, and if the message that can only be consumed by `consumer_1`
+ * is delivered to `consumer_2` all the time, and the message that can only be consumed by `consumer_2` is delivered
+ * to `consumer_1` all the time, then the problem occurs:
+ * - Both consumers can not receive messages anymore.
+ * - The number of redeliveries of entries has been increasing ( redelivery by Entry Filter ).
+ *
+ * So {@link InMemoryAndPreventCycleFilterRedeliveryTracker} solve this problem by this way:
+ * When a message is redelivered by the same consumer more than 3 times, the consumption of this message by that
+ * consumer is paused for 1 second. Since tracking the consumption of all the messages cost memory too much, we trace
+ * only the messages with the smallest position.
+ */
+public class InMemoryAndPreventCycleFilterRedeliveryTracker extends InMemoryRedeliveryTracker {
+
+    /** The first redelivery record. **/
+    @Getter
+    private volatile Position redeliveryStartAt;
+    /**
+     * key: The consumer calls redelivery at {@link #redeliveryStartAt}.
+     * value: The number of times the consumer calls redelivery.
+     */
+    private final ConcurrentHashMap<Consumer, AtomicInteger> earliestEntryRedeliveryCountMapping =
+            new ConcurrentHashMap<>();
+    /**
+     *  key: paused consumer.
+     *  value: pause because of which position.
+     */
+    private final ConcurrentHashMap<Consumer, PauseConsumerInformation> pausedConsumers = new ConcurrentHashMap<>();
+
+    @Override
+    public int incrementAndGetRedeliveryCount(Position position, Consumer consumer) {
+        int newCount = super.incrementAndGetRedeliveryCount(position, consumer);
+        // Diff with super implementation: record how many times this consumer calls "redelivery" at earliest position.
+        if (position == null || consumer == null){
+            return newCount;
+        }
+        Position originalEarliestPosition = redeliveryStartAt;
+        Position actualEarliestPosition = null;
+        if (originalEarliestPosition == null) {
+            cleanEarliestInformation();
+            actualEarliestPosition = position;
+        } else {
+            int isEarlier = comparePosition(originalEarliestPosition, position);
+            if (isEarlier < 0) {
+                return newCount;
+            } else if (isEarlier > 0) {
+                cleanEarliestInformation();
+                actualEarliestPosition = position;
+            } else {
+                actualEarliestPosition = originalEarliestPosition;
+            }
+        }
+        redeliveryStartAt = actualEarliestPosition;
+        int redeliveryCount = earliestEntryRedeliveryCountMapping.computeIfAbsent(consumer, c -> new AtomicInteger())
+                .incrementAndGet();
+        if (redeliveryCount >= 3) {
+            pausedConsumers.put(consumer, new PauseConsumerInformation(actualEarliestPosition));
+        } else {
+        }
+        return newCount;
+    }
+
+    @Override
+    public void remove(Position position, Position markDeletedPosition) {
+        super.remove(position, markDeletedPosition);
+        if (redeliveryStartAt == null || comparePosition(redeliveryStartAt, position) == 0
+                || comparePosition(redeliveryStartAt, markDeletedPosition) >= 0) {
+            cleanEarliestInformation();
+        }
+    }
+
+    @Override
+    public void clear() {
+        super.clear();
+        cleanEarliestInformation();
+    }
+
+    public boolean isConsumerPaused(Consumer consumer) {
+        if (consumer == null) {
+            return false;
+        }
+        PauseConsumerInformation pauseConsumerInformation = pausedConsumers.get(consumer);
+        if (pauseConsumerInformation == null) {
+            return false;
+        }
+        if (!pauseConsumerInformation.isValid(redeliveryStartAt)) {
+            pausedConsumers.remove(consumer);
+            return false;
+        }
+        return true;
+    }
+
+    public int pausedConsumerCount() {
+        return (int) pausedConsumers.keySet().stream().map(this::isConsumerPaused).count();
+    }
+
+    private void cleanEarliestInformation() {
+        redeliveryStartAt = null;
+        // If consumer has been closed, remove this consumer.
+        List<Consumer> closedConsumers = earliestEntryRedeliveryCountMapping.keySet().stream()
+                .filter(Consumer::isClosed).toList();
+        closedConsumers.forEach(earliestEntryRedeliveryCountMapping::remove);
+        // Just reset counter to 0, because value will be removed if consumer is closed.
+        earliestEntryRedeliveryCountMapping.values().forEach(i -> i.set(0));
+        pausedConsumers.clear();
+    }
+
+    private static int comparePosition(Position pos1, Position pos2) {

Review Comment:
   this method should go somewhere in Position or in some utility class
   like comparePositionByEntryId
   
   please check in the code if we already have something like that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org