You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/04/29 03:00:25 UTC

[pulsar] branch master updated: [fix] [broker] Fix totalEntries calculation problem in AbstractBaseDispatcher#filterEntriesForConsumere (#15298)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new afbd9252f49 [fix] [broker] Fix totalEntries calculation problem in AbstractBaseDispatcher#filterEntriesForConsumere (#15298)
afbd9252f49 is described below

commit afbd9252f49676185b273b0a9b09b45cec744365
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Fri Apr 29 11:00:18 2022 +0800

    [fix] [broker] Fix totalEntries calculation problem in AbstractBaseDispatcher#filterEntriesForConsumere (#15298)
    
    Fixe #15297
    
    ### Motivation
    Fix totalEntries calculation problem in AbstractBaseDispatcher#filterEntriesForConsumere
---
 .../broker/service/AbstractBaseDispatcher.java     |   2 +-
 .../broker/service/AbstractBaseDispatcherTest.java | 307 +++++++++++++++++++++
 2 files changed, 308 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 083a8e9072a..d14f683070b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -139,7 +139,6 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
             if (entry == null) {
                 continue;
             }
-            totalEntries++;
             ByteBuf metadataAndPayload = entry.getDataBuffer();
             int entryWrapperIndex = i + entryWrapperOffset;
             MessageMetadata msgMetadata = entryWrapper.isPresent() && entryWrapper.get()[entryWrapperIndex] != null
@@ -196,6 +195,7 @@ public abstract class AbstractBaseDispatcher implements Dispatcher {
                 continue;
             }
 
+            totalEntries++;
             int batchSize = msgMetadata.getNumMessagesInBatch();
             totalMessages += batchSize;
             totalBytes += metadataAndPayload.readableBytes();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
new file mode 100644
index 00000000000..07872f37683
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableMap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.impl.EntryImpl;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
+import org.apache.pulsar.broker.service.plugin.FilterContext;
+import org.apache.pulsar.client.api.transaction.TxnID;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.MarkerType;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.protocol.Markers;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AbstractBaseDispatcherTest {
+
+    private AbstractBaseDispatcherTestHelper helper;
+
+    private ServiceConfiguration svcConfig;
+
+    private PersistentSubscription subscriptionMock;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        this.svcConfig = mock(ServiceConfiguration.class);
+        this.subscriptionMock = mock(PersistentSubscription.class);
+        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+    }
+
+    @Test
+    public void testFilterEntriesForConsumerOfNullElement() {
+        List<Entry> entries = new ArrayList<>();
+        entries.add(null);
+
+        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+        EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
+
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
+        assertEquals(size, 0);
+    }
+
+
+    @Test
+    public void testFilterEntriesForConsumerOfEntryFilter() {
+        Topic mockTopic = mock(Topic.class);
+        when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
+
+        BrokerService mockBrokerService = mock(BrokerService.class);
+        when(mockTopic.getBrokerService()).thenReturn(mockBrokerService);
+
+        EntryFilterWithClassLoader mockFilter = mock(EntryFilterWithClassLoader.class);
+        when(mockFilter.filterEntry(any(Entry.class), any(FilterContext.class))).thenReturn(
+                EntryFilter.FilterResult.REJECT);
+        ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("key", mockFilter);
+        when(mockBrokerService.getEntryFilters()).thenReturn(entryFilters);
+
+        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+
+        List<Entry> entries = new ArrayList<>();
+
+        entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
+        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+        EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
+        //
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
+        assertEquals(size, 0);
+    }
+
+    @Test
+    public void testFilterEntriesForConsumerOfTxnMsgAbort() {
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createTnxAbortMessage("message1", 1)));
+
+        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+        EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
+        assertEquals(size, 0);
+    }
+
+    @Test
+    public void testFilterEntriesForConsumerOfTxnBufferAbort() {
+        PersistentTopic mockTopic = mock(PersistentTopic.class);
+        when(this.subscriptionMock.getTopic()).thenReturn(mockTopic);
+
+        when(mockTopic.isTxnAborted(any(TxnID.class))).thenReturn(true);
+
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createTnxMessage("message1", 1)));
+
+        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+        EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
+        assertEquals(size, 0);
+    }
+
+    @Test
+    public void testFilterEntriesForConsumerOfServerOnlyMarker() {
+        List<Entry> entries = new ArrayList<>();
+        ByteBuf markerMessage =
+                Markers.newReplicatedSubscriptionsSnapshotRequest("testSnapshotId", "testSourceCluster");
+        entries.add(EntryImpl.create(1, 1, markerMessage));
+
+        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+        EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
+        assertEquals(size, 0);
+    }
+
+    @Test
+    public void testFilterEntriesForConsumerOfDelayedMsg() {
+        List<Entry> entries = new ArrayList<>();
+        entries.add(EntryImpl.create(1, 1, createDelayedMessage("message1", 1)));
+
+        SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
+        EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
+        assertEquals(size, 0);
+    }
+
+    private ByteBuf createMessage(String message, int sequenceId) {
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(sequenceId)
+                .setProducerName("testProducer")
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis());
+        return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+    }
+
+    private ByteBuf createTnxMessage(String message, int sequenceId) {
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(sequenceId)
+                .setProducerName("testProducer")
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis())
+                .setTxnidMostBits(8)
+                .setTxnidLeastBits(0);
+        return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+    }
+
+    private ByteBuf createTnxAbortMessage(String message, int sequenceId) {
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(sequenceId)
+                .setProducerName("testProducer")
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis())
+                .setTxnidMostBits(8)
+                .setTxnidLeastBits(0)
+                .setMarkerType(MarkerType.TXN_ABORT_VALUE);
+        return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+    }
+
+    private ByteBuf createDelayedMessage(String message, int sequenceId) {
+        MessageMetadata messageMetadata = new MessageMetadata()
+                .setSequenceId(sequenceId)
+                .setProducerName("testProducer")
+                .setPartitionKeyB64Encoded(false)
+                .setPublishTime(System.currentTimeMillis())
+                .setDeliverAtTime(System.currentTimeMillis() + 5000);
+        return serializeMetadataAndPayload(Commands.ChecksumType.Crc32c, messageMetadata,
+                Unpooled.copiedBuffer(message.getBytes(UTF_8)));
+    }
+
+    private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
+
+        protected AbstractBaseDispatcherTestHelper(Subscription subscription,
+                                                   ServiceConfiguration serviceConfig) {
+            super(subscription, serviceConfig);
+        }
+
+        @Override
+        protected boolean isConsumersExceededOnSubscription() {
+            return false;
+        }
+
+        @Override
+        public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata msgMetadata) {
+            //for test.
+            return true;
+        }
+
+        @Override
+        protected void reScheduleRead() {
+
+        }
+
+        @Override
+        public void addConsumer(Consumer consumer) throws BrokerServiceException {
+
+        }
+
+        @Override
+        public void removeConsumer(Consumer consumer) throws BrokerServiceException {
+
+        }
+
+        @Override
+        public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
+
+        }
+
+        @Override
+        public boolean isConsumerConnected() {
+            return false;
+        }
+
+        @Override
+        public List<Consumer> getConsumers() {
+            return null;
+        }
+
+        @Override
+        public boolean canUnsubscribe(Consumer consumer) {
+            return false;
+        }
+
+        @Override
+        public CompletableFuture<Void> close() {
+            return null;
+        }
+
+        @Override
+        public boolean isClosed() {
+            return false;
+        }
+
+        @Override
+        public CompletableFuture<Void> disconnectActiveConsumers(boolean isResetCursor) {
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> disconnectAllConsumers(boolean isResetCursor) {
+            return null;
+        }
+
+        @Override
+        public void reset() {
+
+        }
+
+        @Override
+        public CommandSubscribe.SubType getType() {
+            return null;
+        }
+
+        @Override
+        public void redeliverUnacknowledgedMessages(Consumer consumer, long consumerEpoch) {
+
+        }
+
+        @Override
+        public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
+
+        }
+
+        @Override
+        public void addUnAckedMessages(int unAckMessages) {
+
+        }
+
+        @Override
+        public RedeliveryTracker getRedeliveryTracker() {
+            return null;
+        }
+    }
+
+}