You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/17 01:02:43 UTC

[pulsar] branch master updated: [fix][broker] Fix calculate avg message per entry (#17046)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c2b75edf3c [fix][broker] Fix calculate avg message per entry (#17046)
2c2b75edf3c is described below

commit 2c2b75edf3c33a1258c070e12e7d175ae83a1c66
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Aug 17 09:02:35 2022 +0800

    [fix][broker] Fix calculate avg message per entry (#17046)
---
 .../org/apache/pulsar/broker/service/Consumer.java |  21 ++--
 .../service/plugin/EntryFilterProducerTest.java    |  62 +++++++++++
 .../pulsar/broker/stats/ConsumerStatsTest.java     | 124 +++++++++++++++++++--
 3 files changed, 186 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 20f3d3f74d8..5add5829174 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -290,13 +290,16 @@ public class Consumer {
             return writePromise;
         }
         int unackedMessages = totalMessages;
-        // Note
-        // Must ensure that the message is written to the pendingAcks before sent is first, because this consumer
-        // is possible to disconnect at this time.
-        if (pendingAcks != null) {
-            for (int i = 0; i < entries.size(); i++) {
-                Entry entry = entries.get(i);
-                if (entry != null) {
+        int totalEntries = 0;
+
+        for (int i = 0; i < entries.size(); i++) {
+            Entry entry = entries.get(i);
+            if (entry != null) {
+                totalEntries++;
+                // Note
+                // Must ensure that the message is written to the pendingAcks before sent is first,
+                // because this consumer is possible to disconnect at this time.
+                if (pendingAcks != null) {
                     int batchSize = batchSizes.getBatchSize(i);
                     int stickyKeyHash = getStickyKeyHash(entry);
                     long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
@@ -317,10 +320,10 @@ public class Consumer {
         // calculate avg message per entry
         if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
             // set init value.
-            avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+            avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries);
         } else {
             avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
-                    + (1 - avgPercent) * totalMessages / entries.size());
+                    + (1 - avgPercent) * totalMessages / totalEntries);
         }
 
         // reduce permit and increment unackedMsg count with total number of messages in batch-msgs
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java
new file mode 100644
index 00000000000..5973b7fe54b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.Collections;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.service.Consumer;
+
+@Slf4j
+public class EntryFilterProducerTest implements EntryFilter {
+    @Override
+    public FilterResult filterEntry(Entry entry, FilterContext context) {
+        if (context.getMsgMetadata() == null) {
+            return FilterResult.ACCEPT;
+        }
+        Consumer consumer = context.getConsumer();
+        Map<String, String> metadata = consumer != null ? consumer.getMetadata() : Collections.emptyMap();
+        log.info("filterEntry for {}", metadata);
+        String matchValueAccept = metadata.getOrDefault("matchValueAccept", "ACCEPT");
+        String matchValueReject = metadata.getOrDefault("matchValueReject", "REJECT");
+        String matchValueReschedule = metadata.getOrDefault("matchValueReschedule", "RESCHEDULE");
+        // filter by string
+        String producerName = context.getMsgMetadata().getProducerName();
+        if (matchValueAccept.equalsIgnoreCase(producerName)) {
+            log.info("metadata {} producerName {} outcome ACCEPT", metadata, producerName);
+            return FilterResult.ACCEPT;
+        } else if (matchValueReject.equalsIgnoreCase(producerName)){
+            log.info("metadata {} producerName {} outcome REJECT", metadata, producerName);
+            return FilterResult.REJECT;
+        } else if (matchValueReschedule.equalsIgnoreCase(producerName)){
+            log.info("metadata {} producerName {} outcome RESCHEDULE", metadata, producerName);
+            return FilterResult.RESCHEDULE;
+        } else {
+            log.info("metadata {} producerName {} outcome ??", metadata, producerName);
+        }
+        return null;
+    }
+
+    @Override
+    public void close() {
+
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 106a40084d1..e845e6e71fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -18,44 +18,65 @@
  */
 package org.apache.pulsar.broker.stats;
 
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.AssertJUnit.assertEquals;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageListener;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
 @Slf4j
 @Test(groups = "broker")
 public class ConsumerStatsTest extends ProducerConsumerBase {
@@ -343,4 +364,83 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
             Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
         }
     }
+
+    @Override
+    protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+        return new PulsarService(conf) {
+            @Override
+            protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+                return spy(new BrokerService(this, ioEventLoopGroup));
+            }
+        };
+    }
+
+    @Test
+    public void testAvgMessagesPerEntry() throws Exception {
+        final String topic = "persistent://public/default/testFilterState";
+        String subName = "sub";
+
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .producerName("producer1")
+                .enableBatching(true).topic(topic)
+                .batchingMaxMessages(20)
+                .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+                .batchingMaxBytes(Integer.MAX_VALUE)
+                .create();
+
+        producer.send("first-message");
+        List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+        for (int i = 0; i < 20; i++) {
+            futures.add(producer.sendAsync("message"));
+        }
+        FutureUtil.waitForAll(futures);
+        producer.close();
+
+        Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+                .producerName("producer2")
+                .enableBatching(false).topic(topic)
+                .create();
+        producer2.newMessage().value("producer2-message").send();
+        producer2.close();
+
+        // mock entry filters
+        NarClassLoader narClassLoader = mock(NarClassLoader.class);
+        EntryFilter filter = new EntryFilterProducerTest();
+        EntryFilterWithClassLoader
+                loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
+                narClassLoader);
+        ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("filter", loader);
+        BrokerService brokerService = pulsar.getBrokerService();
+        doReturn(entryFilters).when(brokerService).getEntryFilters();
+
+        Map<String, String> metadataConsumer = new HashMap<>();
+        metadataConsumer.put("matchValueAccept", "producer1");
+        metadataConsumer.put("matchValueReschedule", "producer2");
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
+                .subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+        int counter = 0;
+        while (true) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message != null) {
+                counter++;
+                assertNotEquals(message.getValue(), "producer2-message");
+                consumer.acknowledge(message);
+            } else {
+                break;
+            }
+        }
+
+        assertEquals(21, counter);
+
+        ConsumerStats consumerStats =
+                admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
+
+        assertEquals(21, consumerStats.getMsgOutCounter());
+
+        // Math.round(1 * 0.9 + 0.1 * (20 / 1))
+        int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
+        assertEquals(3, avgMessagesPerEntry);
+    }
 }