You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/17 01:02:43 UTC
[pulsar] branch master updated: [fix][broker] Fix calculate avg message per entry (#17046)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 2c2b75edf3c [fix][broker] Fix calculate avg message per entry (#17046)
2c2b75edf3c is described below
commit 2c2b75edf3c33a1258c070e12e7d175ae83a1c66
Author: Cong Zhao <zh...@apache.org>
AuthorDate: Wed Aug 17 09:02:35 2022 +0800
[fix][broker] Fix calculate avg message per entry (#17046)
---
.../org/apache/pulsar/broker/service/Consumer.java | 21 ++--
.../service/plugin/EntryFilterProducerTest.java | 62 +++++++++++
.../pulsar/broker/stats/ConsumerStatsTest.java | 124 +++++++++++++++++++--
3 files changed, 186 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 20f3d3f74d8..5add5829174 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -290,13 +290,16 @@ public class Consumer {
return writePromise;
}
int unackedMessages = totalMessages;
- // Note
- // Must ensure that the message is written to the pendingAcks before sent is first, because this consumer
- // is possible to disconnect at this time.
- if (pendingAcks != null) {
- for (int i = 0; i < entries.size(); i++) {
- Entry entry = entries.get(i);
- if (entry != null) {
+ int totalEntries = 0;
+
+ for (int i = 0; i < entries.size(); i++) {
+ Entry entry = entries.get(i);
+ if (entry != null) {
+ totalEntries++;
+ // Note
+ // Must ensure that the message is written to the pendingAcks before sent is first,
+ // because this consumer is possible to disconnect at this time.
+ if (pendingAcks != null) {
int batchSize = batchSizes.getBatchSize(i);
int stickyKeyHash = getStickyKeyHash(entry);
long[] ackSet = getCursorAckSet(PositionImpl.get(entry.getLedgerId(), entry.getEntryId()));
@@ -317,10 +320,10 @@ public class Consumer {
// calculate avg message per entry
if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1
// set init value.
- avgMessagesPerEntry.set(1.0 * totalMessages / entries.size());
+ avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries);
} else {
avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent
- + (1 - avgPercent) * totalMessages / entries.size());
+ + (1 - avgPercent) * totalMessages / totalEntries);
}
// reduce permit and increment unackedMsg count with total number of messages in batch-msgs
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java
new file mode 100644
index 00000000000..5973b7fe54b
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/EntryFilterProducerTest.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.plugin;
+
+
+import java.util.Collections;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.broker.service.Consumer;
+
+@Slf4j
+public class EntryFilterProducerTest implements EntryFilter {
+ @Override
+ public FilterResult filterEntry(Entry entry, FilterContext context) {
+ if (context.getMsgMetadata() == null) {
+ return FilterResult.ACCEPT;
+ }
+ Consumer consumer = context.getConsumer();
+ Map<String, String> metadata = consumer != null ? consumer.getMetadata() : Collections.emptyMap();
+ log.info("filterEntry for {}", metadata);
+ String matchValueAccept = metadata.getOrDefault("matchValueAccept", "ACCEPT");
+ String matchValueReject = metadata.getOrDefault("matchValueReject", "REJECT");
+ String matchValueReschedule = metadata.getOrDefault("matchValueReschedule", "RESCHEDULE");
+ // filter by string
+ String producerName = context.getMsgMetadata().getProducerName();
+ if (matchValueAccept.equalsIgnoreCase(producerName)) {
+ log.info("metadata {} producerName {} outcome ACCEPT", metadata, producerName);
+ return FilterResult.ACCEPT;
+ } else if (matchValueReject.equalsIgnoreCase(producerName)){
+ log.info("metadata {} producerName {} outcome REJECT", metadata, producerName);
+ return FilterResult.REJECT;
+ } else if (matchValueReschedule.equalsIgnoreCase(producerName)){
+ log.info("metadata {} producerName {} outcome RESCHEDULE", metadata, producerName);
+ return FilterResult.RESCHEDULE;
+ } else {
+ log.info("metadata {} producerName {} outcome ??", metadata, producerName);
+ }
+ return null;
+ }
+
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
index 106a40084d1..e845e6e71fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ConsumerStatsTest.java
@@ -18,44 +18,65 @@
*/
package org.apache.pulsar.broker.stats;
+import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.AssertJUnit.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.broker.service.plugin.EntryFilter;
+import org.apache.pulsar.broker.service.plugin.EntryFilterProducerTest;
+import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.policies.data.TopicStats;
-import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ConsumerStats;
+import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
+import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
@Slf4j
@Test(groups = "broker")
public class ConsumerStatsTest extends ProducerConsumerBase {
@@ -343,4 +364,83 @@ public class ConsumerStatsTest extends ProducerConsumerBase {
Assert.assertEquals(totalAckRate, totalRateOut, totalRateOut * 0.1D);
}
}
+
+ @Override
+ protected PulsarService newPulsarService(ServiceConfiguration conf) throws Exception {
+ return new PulsarService(conf) {
+ @Override
+ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception {
+ return spy(new BrokerService(this, ioEventLoopGroup));
+ }
+ };
+ }
+
+ @Test
+ public void testAvgMessagesPerEntry() throws Exception {
+ final String topic = "persistent://public/default/testFilterState";
+ String subName = "sub";
+
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .producerName("producer1")
+ .enableBatching(true).topic(topic)
+ .batchingMaxMessages(20)
+ .batchingMaxPublishDelay(5, TimeUnit.SECONDS)
+ .batchingMaxBytes(Integer.MAX_VALUE)
+ .create();
+
+ producer.send("first-message");
+ List<CompletableFuture<MessageId>> futures = new ArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ futures.add(producer.sendAsync("message"));
+ }
+ FutureUtil.waitForAll(futures);
+ producer.close();
+
+ Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+ .producerName("producer2")
+ .enableBatching(false).topic(topic)
+ .create();
+ producer2.newMessage().value("producer2-message").send();
+ producer2.close();
+
+ // mock entry filters
+ NarClassLoader narClassLoader = mock(NarClassLoader.class);
+ EntryFilter filter = new EntryFilterProducerTest();
+ EntryFilterWithClassLoader
+ loader = spyWithClassAndConstructorArgs(EntryFilterWithClassLoader.class, filter,
+ narClassLoader);
+ ImmutableMap<String, EntryFilterWithClassLoader> entryFilters = ImmutableMap.of("filter", loader);
+ BrokerService brokerService = pulsar.getBrokerService();
+ doReturn(entryFilters).when(brokerService).getEntryFilters();
+
+ Map<String, String> metadataConsumer = new HashMap<>();
+ metadataConsumer.put("matchValueAccept", "producer1");
+ metadataConsumer.put("matchValueReschedule", "producer2");
+ @Cleanup
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
+ .subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
+
+ int counter = 0;
+ while (true) {
+ Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+ if (message != null) {
+ counter++;
+ assertNotEquals(message.getValue(), "producer2-message");
+ consumer.acknowledge(message);
+ } else {
+ break;
+ }
+ }
+
+ assertEquals(21, counter);
+
+ ConsumerStats consumerStats =
+ admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);
+
+ assertEquals(21, consumerStats.getMsgOutCounter());
+
+ // Math.round(1 * 0.9 + 0.1 * (20 / 1))
+ int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
+ assertEquals(3, avgMessagesPerEntry);
+ }
}