You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/04 03:17:45 UTC

[pulsar] branch master updated: Improved efficiency in KeyShared dispatcher (#7104)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 605868d  Improved efficiency in KeyShared dispatcher (#7104)
605868d is described below

commit 605868d64e018bc4fc075b4633423de0928e3d9a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jun 3 20:17:35 2020 -0700

    Improved efficiency in KeyShared dispatcher (#7104)
    
    Instead of grouping the messages by same key, group them directly by same consumer in order to achieve max possible grouping when sending them. Also reuse the map for messages grouping as a thread local.
---
 .../broker/service/AbstractBaseDispatcher.java     |   4 +
 ...ConsistentHashingStickyKeyConsumerSelector.java |   5 +-
 ...ashRangeAutoSplitStickyKeyConsumerSelector.java |   5 +-
 ...ashRangeExclusiveStickyKeyConsumerSelector.java |   2 +-
 .../broker/service/StickyKeyConsumerSelector.java  |   7 --
 ...istentStickyKeyDispatcherMultipleConsumers.java |  78 ++++++++-------
 .../PersistentDispatcherSingleActiveConsumer.java  |   5 +-
 ...istentStickyKeyDispatcherMultipleConsumers.java | 111 ++++++++++-----------
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |   5 +-
 9 files changed, 106 insertions(+), 116 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 7985f30..6272a2c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -75,6 +75,10 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
 
         for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
             Entry entry = entries.get(i);
+            if (entry == null) {
+                continue;
+            }
+
             ByteBuf metadataAndPayload = entry.getDataBuffer();
 
             MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
index f27c7f9..377edae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
@@ -81,11 +81,8 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
 
     @Override
     public Consumer select(byte[] stickyKey) {
-        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
-    }
+        int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
 
-    @Override
-    public Consumer select(int hash) {
         rwLock.readLock().lock();
         try {
             if (hashRing.isEmpty()) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
index 5c3c5b5..18b07f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeAutoSplitStickyKeyConsumerSelector.java
@@ -103,10 +103,7 @@ public class HashRangeAutoSplitStickyKeyConsumerSelector implements StickyKeyCon
 
     @Override
     public Consumer select(byte[] stickyKey) {
-        return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
-    }
-
-    public Consumer select(int hash) {
+        int hash = Murmur3_32Hash.getInstance().makeHash(stickyKey);
         if (rangeMap.size() > 0) {
             int slot = hash % rangeSize;
             return rangeMap.ceilingEntry(slot).getValue();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
index 21e94ba..dc96fbb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/HashRangeExclusiveStickyKeyConsumerSelector.java
@@ -63,7 +63,7 @@ public class HashRangeExclusiveStickyKeyConsumerSelector implements StickyKeyCon
         return select(Murmur3_32Hash.getInstance().makeHash(stickyKey));
     }
 
-    public Consumer select(int hash) {
+    Consumer select(int hash) {
         if (rangeMap.size() > 0) {
             int slot = hash % rangeSize;
             Map.Entry<Integer, Consumer> ceilingEntry = rangeMap.ceilingEntry(slot);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
index 88852b5..1b168d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/StickyKeyConsumerSelector.java
@@ -43,11 +43,4 @@ public interface StickyKeyConsumerSelector {
      * @return consumer
      */
     Consumer select(byte[] stickyKey);
-
-    /**
-     * Select a consumer by hash of the sticky they
-     * @param keyHash hash of sticky key
-     * @return
-     */
-    Consumer select(int keyHash);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index b01a432..32cce87 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,13 @@
  */
 package org.apache.pulsar.broker.service.nonpersistent;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.Consumer;
@@ -25,15 +32,7 @@ import org.apache.pulsar.broker.service.EntryBatchSizes;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
-import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
 
 public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersistentDispatcherMultipleConsumers {
 
@@ -62,37 +61,42 @@ public class NonPersistentStickyKeyDispatcherMultipleConsumers extends NonPersis
         return SubType.Key_Shared;
     }
 
+    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
+        @Override
+        protected Map<Consumer, List<Entry>> initialValue() throws Exception {
+            return new HashMap<>();
+        }
+    };
+
     @Override
     public void sendMessages(List<Entry> entries) {
-        if (entries.size() > 0) {
-            final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
-            for (Entry entry : entries) {
-                int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
-                groupedEntries.putIfAbsent(key, new ArrayList<>());
-                groupedEntries.get(key).add(entry);
-            }
-            final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
-            while (iterator.hasNext()) {
-                final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
-                //TODO: None key policy
-                Consumer consumer = selector.select(entriesWithSameKey.getKey());
-                if (consumer != null) {
-                    SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-                    EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesWithSameKey.getValue().size());
-                    filterEntriesForConsumer(entriesWithSameKey.getValue(), batchSizes, sendMessageInfo, null, null);
-                    consumer.sendMessages(entriesWithSameKey.getValue(), batchSizes, null, sendMessageInfo.getTotalMessages(),
-                            sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
-                    TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
-                } else {
-                    entries.forEach(entry -> {
-                        int totalMsgs = Commands.getNumberOfMessagesInBatch(entry.getDataBuffer(), subscription.toString(), -1);
-                        if (totalMsgs > 0) {
-                            msgDrop.recordEvent(totalMsgs);
-                        }
-                        entry.release();
-                    });
-                }
-            }
+        if (!entries.isEmpty()) {
+            return;
+        }
+
+        if (consumerSet.isEmpty()) {
+            entries.forEach(Entry::release);
+            return;
+        }
+
+        final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
+        groupedEntries.clear();
+
+        for (Entry entry : entries) {
+            Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer()));
+            groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+        }
+
+        for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) {
+            Consumer consumer = entriesByConsumer.getKey();
+            List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+
+            SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+            EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
+            filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null);
+            consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
+                    sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
+            TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
         }
     }
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 263bd7f..5bb7629 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -215,8 +214,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp
             Iterator<Entry> iterator = entries.iterator();
             while (iterator.hasNext()) {
                 Entry entry = iterator.next();
-                int keyHash = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
-                Consumer consumer = stickyKeyConsumerSelector.select(keyHash);
+                byte[] key = peekStickyKey(entry.getDataBuffer());
+                Consumer consumer = stickyKeyConsumerSelector.select(key);
                 if (consumer == null || currentConsumer != consumer) {
                     iterator.remove();
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 6079d31..ce946fc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,9 +18,10 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import io.netty.util.concurrent.FastThreadLocal;
+
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -37,7 +38,6 @@ import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.util.Murmur3_32Hash;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,59 +63,79 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
         selector.removeConsumer(consumer);
     }
 
+    private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() {
+        @Override
+        protected Map<Consumer, List<Entry>> initialValue() throws Exception {
+            return new HashMap<>();
+        }
+    };
+
     @Override
     protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) {
         long totalMessagesSent = 0;
         long totalBytesSent = 0;
+        int entriesCount = entries.size();
+
         // Trigger read more messages
-        if (entries.size() == 0) {
+        if (entriesCount == 0) {
             readMoreEntries();
             return;
         }
-        final Map<Integer, List<Entry>> groupedEntries = new HashMap<>();
-        for (Entry entry : entries) {
-            int key = Murmur3_32Hash.getInstance().makeHash(peekStickyKey(entry.getDataBuffer()));
-            groupedEntries.putIfAbsent(key, new ArrayList<>());
-            groupedEntries.get(key).add(entry);
+
+        if (consumerSet.isEmpty()) {
+            entries.forEach(Entry::release);
+            cursor.rewind();
+            return;
         }
-        final Iterator<Map.Entry<Integer, List<Entry>>> iterator = groupedEntries.entrySet().iterator();
+
+        final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
+        groupedEntries.clear();
+
+        for (int i = 0; i < entriesCount; i++) {
+            Entry entry = entries.get(i);
+            Consumer c = selector.select(peekStickyKey(entry.getDataBuffer()));
+            groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
+        }
+
         AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
-        while (iterator.hasNext() && totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
-            final Map.Entry<Integer, List<Entry>> entriesWithSameKey = iterator.next();
-            //TODO: None key policy
-            Consumer consumer = selector.select(entriesWithSameKey.getKey());
-            if (consumer == null) {
-                // Do nothing, cursor will be rewind at reconnection
-                log.info("[{}] rewind because no available consumer found for key {} from total {}", name,
-                        entriesWithSameKey.getKey(), consumerList.size());
-                entriesWithSameKey.getValue().forEach(Entry::release);
-                cursor.rewind();
-                return;
-            }
 
-            int availablePermits = consumer.isWritable() ? consumer.getAvailablePermits() : 1;
-            if (log.isDebugEnabled() && !consumer.isWritable()) {
-                log.debug("[{}-{}] consumer is not writable. dispatching only 1 message to {} ", topic.getName(), name,
-                        consumer);
-            }
-            int messagesForC = Math.min(entriesWithSameKey.getValue().size(), availablePermits);
+        for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
+            Consumer consumer = current.getKey();
+            List<Entry> entriesWithSameKey = current.getValue();
+            int entriesWithSameKeyCount = entriesWithSameKey.size();
+
+            int messagesForC = Math.min(entriesWithSameKeyCount, consumer.getAvailablePermits());
             if (log.isDebugEnabled()) {
-                log.debug("[{}] select consumer {} for key {} with messages num {}, read type is {}",
-                        name, consumer.consumerName(), entriesWithSameKey.getKey(), messagesForC, readType);
+                log.debug("[{}] select consumer {} with messages num {}, read type is {}",
+                        name, consumer.consumerName(), messagesForC, readType);
+            }
+
+            if (messagesForC < entriesWithSameKeyCount) {
+                // We are not able to push all the messages with given key to its consumer,
+                // so we discard for now and mark them for later redelivery
+                for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
+                    Entry entry = entriesWithSameKey.get(i);
+                    messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
+                    entry.release();
+                    entriesWithSameKey.set(i, null);
+                }
             }
+
             if (messagesForC > 0) {
                 // remove positions first from replay list first : sendMessages recycles entries
-                List<Entry> subList = new ArrayList<>(entriesWithSameKey.getValue().subList(0, messagesForC));
                 if (readType == ReadType.Replay) {
-                    subList.forEach(entry -> messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()));
+                    for (int i = 0; i < messagesForC; i++) {
+                        Entry entry = entriesWithSameKey.get(i);
+                        messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId());
+                    }
                 }
 
                 SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
-                EntryBatchSizes batchSizes = EntryBatchSizes.get(subList.size());
+                EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
                 EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get();
-                filterEntriesForConsumer(subList, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
+                filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor);
 
-                consumer.sendMessages(subList, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
+                consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
                         getRedeliveryTracker()).addListener(future -> {
                             if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) {
@@ -123,17 +143,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                             }
                         });
 
-                for (int i = 0; i < messagesForC; i++) {
-                    entriesWithSameKey.getValue().remove(0);
-                }
-
                 TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
                 totalMessagesSent += sendMessageInfo.getTotalMessages();
                 totalBytesSent += sendMessageInfo.getTotalBytes();
-
-                if (entriesWithSameKey.getValue().size() == 0) {
-                    iterator.remove();
-                }
             }
         }
 
@@ -147,21 +159,6 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
                 dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent);
             }
         }
-
-        if (groupedEntries.size() > 0) {
-            int laterReplay = 0;
-            for (List<Entry> entryList : groupedEntries.values()) {
-                laterReplay += entryList.size();
-                entryList.forEach(entry -> {
-                    messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId());
-                    entry.release();
-                });
-            }
-            if (log.isDebugEnabled()) {
-                log.debug("[{}] No consumers found with available permits, storing {} positions for later replay", name,
-                        laterReplay);
-            }
-        }
     }
 
     @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 99822e9..29b807a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -141,7 +141,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         }
 
         ArgumentCaptor<Integer> totalMessagesCaptor = ArgumentCaptor.forClass(Integer.class);
-        verify(consumerMock, times(2)).sendMessages(
+        verify(consumerMock, times(1)).sendMessages(
                 anyList(),
                 any(EntryBatchSizes.class),
                 any(EntryBatchIndexesAcks.class),
@@ -152,8 +152,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         );
 
         List<Integer> allTotalMessagesCaptor = totalMessagesCaptor.getAllValues();
-        Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 0);
-        Assert.assertEquals(allTotalMessagesCaptor.get(1).intValue(), 5);
+        Assert.assertEquals(allTotalMessagesCaptor.get(0).intValue(), 5);
     }
 
     private ByteBuf createMessage(String message, int sequenceId) {