You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/10 15:01:52 UTC

[pulsar] 08/12: [fix][broker]Fast return if ack cumulative illegal (#15695)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit dac1c6f39d17b4f64c5dcc8ccd342fc88ecbc2af
Author: Xiaoyu Hou <An...@gmail.com>
AuthorDate: Mon Jun 6 10:59:55 2022 +0800

    [fix][broker]Fast return if ack cumulative illegal (#15695)
    
    (cherry picked from commit 02dca31a78523a7d061ac1d6702ff6600a7f4ec4)
---
 .../org/apache/pulsar/broker/service/Consumer.java |   2 +
 .../broker/service/MessageCumulativeAckTest.java   | 199 +++++++++++++++++++++
 2 files changed, 201 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index f7ed211fb2c..031574975d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -358,11 +358,13 @@ public class Consumer {
         if (ack.getAckType() == AckType.Cumulative) {
             if (ack.getMessageIdsCount() != 1) {
                 log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId);
+                return CompletableFuture.completedFuture(null);
             }
 
             if (Subscription.isIndividualAckMode(subType)) {
                 log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring",
                         subscription, consumerId);
+                return CompletableFuture.completedFuture(null);
             }
             PositionImpl position = PositionImpl.earliest;
             if (ack.getMessageIdsCount() == 1) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
new file mode 100644
index 00000000000..86754efc0c2
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/MessageCumulativeAckTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.apache.pulsar.common.api.proto.CommandAck.AckType.Cumulative;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Failover;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Key_Shared;
+import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Shared;
+import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.resources.PulsarResources;
+import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.transaction.TransactionTestBase;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.api.proto.CommandAck;
+import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.api.proto.ProtocolVersion;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.metadata.api.MetadataStore;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.apache.pulsar.metadata.api.MetadataStoreFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class MessageCumulativeAckTest {
+    private final int consumerId = 1;
+    private BrokerService brokerService;
+    private ServerCnx serverCnx;
+    private MetadataStore store;
+    protected PulsarService pulsar;
+    private OrderedExecutor executor;
+    private EventLoopGroup eventLoopGroup;
+    private PersistentSubscription sub;
+
+    @BeforeMethod
+    public void setup() throws Exception {
+        executor = OrderedExecutor.newBuilder().numThreads(1).name("persistent-dispatcher-cumulative-ack-test").build();
+        ServiceConfiguration svcConfig = spy(ServiceConfiguration.class);
+        svcConfig.setBrokerShutdownTimeoutMs(0L);
+        svcConfig.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
+        svcConfig.setClusterName("pulsar-cluster");
+        pulsar = spyWithClassAndConstructorArgs(PulsarService.class, svcConfig);
+        doReturn(svcConfig).when(pulsar).getConfiguration();
+
+        ManagedLedgerFactory mlFactoryMock = mock(ManagedLedgerFactory.class);
+        doReturn(mlFactoryMock).when(pulsar).getManagedLedgerFactory();
+        doReturn(TransactionTestBase.createMockBookKeeper(executor))
+            .when(pulsar).getBookKeeperClient();
+
+        store = MetadataStoreFactory.create("memory:local", MetadataStoreConfig.builder().build());
+        doReturn(store).when(pulsar).getLocalMetadataStore();
+        doReturn(store).when(pulsar).getConfigurationMetadataStore();
+
+        PulsarResources pulsarResources = new PulsarResources(store, store);
+        doReturn(pulsarResources).when(pulsar).getPulsarResources();
+
+        serverCnx = spyWithClassAndConstructorArgs(ServerCnx.class, pulsar);
+        doReturn(true).when(serverCnx).isActive();
+        doReturn(true).when(serverCnx).isWritable();
+        doReturn(new InetSocketAddress("localhost", 1234)).when(serverCnx).clientAddress();
+        when(serverCnx.getRemoteEndpointProtocolVersion()).thenReturn(ProtocolVersion.v12.getValue());
+        when(serverCnx.ctx()).thenReturn(mock(ChannelHandlerContext.class));
+        doReturn(new PulsarCommandSenderImpl(null, serverCnx))
+            .when(serverCnx).getCommandSender();
+
+        eventLoopGroup = new NioEventLoopGroup();
+        brokerService = spyWithClassAndConstructorArgs(BrokerService.class, pulsar, eventLoopGroup);
+        doReturn(brokerService).when(pulsar).getBrokerService();
+
+        String topicName = TopicName.get("MessageCumulativeAckTest").toString();
+        PersistentTopic persistentTopic = new PersistentTopic(topicName, mock(ManagedLedger.class), brokerService);
+        sub = spy(new PersistentSubscription(persistentTopic, "sub-1",
+            mock(ManagedCursorImpl.class), false));
+        doNothing().when(sub).acknowledgeMessage(any(), any(), any());
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void shutdown() throws Exception {
+        if (brokerService != null) {
+            brokerService.close();
+            brokerService = null;
+        }
+        if (pulsar != null) {
+            pulsar.close();
+            pulsar = null;
+        }
+
+        executor.shutdown();
+        if (eventLoopGroup != null) {
+            eventLoopGroup.shutdownGracefully().get();
+        }
+        store.close();
+        sub = null;
+    }
+
+    @DataProvider(name = "individualAckModes")
+    public static Object[][] individualAckModes() {
+        return new Object[][]{
+            {Shared},
+            {Key_Shared},
+        };
+    }
+
+    @DataProvider(name = "notIndividualAckModes")
+    public static Object[][] notIndividualAckModes() {
+        return new Object[][]{
+            {Exclusive},
+            {Failover},
+        };
+    }
+
+    @Test(timeOut = 5000, dataProvider = "individualAckModes")
+    public void testAckWithIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
+        Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, never()).acknowledgeMessage(any(), any(), any());
+    }
+
+    @Test(timeOut = 5000, dataProvider = "notIndividualAckModes")
+    public void testAckWithNotIndividualAckMode(CommandSubscribe.SubType subType) throws Exception {
+        Consumer consumer = new Consumer(sub, subType, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, times(1)).acknowledgeMessage(any(), any(), any());
+    }
+
+    @Test(timeOut = 5000)
+    public void testAckWithMoreThanNoneMessageIds() throws Exception {
+        Consumer consumer = new Consumer(sub, Failover, "topic-1", consumerId, 0,
+            "Cons1", true, serverCnx, "myrole-1", emptyMap(), false, null,
+            MessageId.latest, DEFAULT_CONSUMER_EPOCH);
+
+        CommandAck commandAck = new CommandAck();
+        commandAck.setAckType(Cumulative);
+        commandAck.setConsumerId(consumerId);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(1L);
+        commandAck.addMessageId().setEntryId(0L).setLedgerId(2L);
+
+        consumer.messageAcked(commandAck).get();
+        verify(sub, never()).acknowledgeMessage(any(), any(), any());
+    }
+}