You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/07/02 02:48:17 UTC
[pulsar] branch branch-2.9 updated: Revert "[improve][broker] Optimise msgOutCounter and bytesOutCounter (#16214) (#16286)"
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 2bedbab0e6a Revert "[improve][broker] Optimise msgOutCounter and bytesOutCounter (#16214) (#16286)"
2bedbab0e6a is described below
commit 2bedbab0e6a43c893bf64ad1a7ac6988056c340c
Author: mattison chao <ma...@gmail.com>
AuthorDate: Sat Jul 2 10:47:37 2022 +0800
Revert "[improve][broker] Optimise msgOutCounter and bytesOutCounter (#16214) (#16286)"
This reverts commit 6f12f5ed2ec5ca3c7da74cad02d9d01e1f7d1e66.
---
.../broker/service/AbstractSubscription.java | 42 ------------
.../pulsar/broker/service/AbstractTopic.java | 17 +----
.../org/apache/pulsar/broker/service/Consumer.java | 8 ---
.../nonpersistent/NonPersistentSubscription.java | 6 +-
.../service/nonpersistent/NonPersistentTopic.java | 4 ++
.../service/persistent/PersistentSubscription.java | 7 +-
.../broker/service/persistent/PersistentTopic.java | 4 ++
.../broker/service/AbstractSubscriptionTest.java | 57 ----------------
.../pulsar/broker/service/AbstractTopicTest.java | 75 ---------------------
.../apache/pulsar/broker/service/ConsumerTest.java | 76 ----------------------
10 files changed, 19 insertions(+), 277 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
deleted file mode 100644
index 6a386670556..00000000000
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractSubscription.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import java.util.Optional;
-import java.util.concurrent.atomic.LongAdder;
-import java.util.function.ToLongFunction;
-
-public abstract class AbstractSubscription implements Subscription {
- protected final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
- protected final LongAdder msgOutFromRemovedConsumer = new LongAdder();
-
- public long getMsgOutCounter() {
- return msgOutFromRemovedConsumer.longValue() + sumConsumers(Consumer::getMsgOutCounter);
- }
-
- public long getBytesOutCounter() {
- return bytesOutFromRemovedConsumers.longValue() + sumConsumers(Consumer::getBytesOutCounter);
- }
-
- private long sumConsumers(ToLongFunction<Consumer> toCounter) {
- return Optional.ofNullable(getDispatcher())
- .map(dispatcher -> dispatcher.getConsumers().stream().mapToLong(toCounter).sum())
- .orElse(0L);
- }
-}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 1307001a720..69c51c77d44 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -33,7 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.ToLongFunction;
import lombok.Getter;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.tuple.Pair;
@@ -137,9 +136,6 @@ public abstract class AbstractTopic implements Topic {
private volatile long lastTopicMaxMessageSizeCheckTimeStamp = 0;
private final long topicMaxMessageSizeCheckIntervalMs;
- protected final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
- protected final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
-
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
@@ -879,20 +875,11 @@ public abstract class AbstractTopic implements Topic {
}
public long getMsgOutCounter() {
- return msgOutFromRemovedSubscriptions.longValue()
- + sumSubscriptions(AbstractSubscription::getMsgOutCounter);
+ return getStats(false, false).msgOutCounter;
}
public long getBytesOutCounter() {
- return bytesOutFromRemovedSubscriptions.longValue()
- + sumSubscriptions(AbstractSubscription::getBytesOutCounter);
- }
-
- private long sumSubscriptions(ToLongFunction<AbstractSubscription> toCounter) {
- return getSubscriptions().values().stream()
- .map(AbstractSubscription.class::cast)
- .mapToLong(toCounter)
- .sum();
+ return getStats(false, false).bytesOutCounter;
}
public boolean isDeleteWhileInactive() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 52a91c5ef3e..2661d29110c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -780,14 +780,6 @@ public class Consumer {
return stats;
}
- public long getMsgOutCounter() {
- return msgOutCounter.longValue();
- }
-
- public long getBytesOutCounter() {
- return bytesOutCounter.longValue();
- }
-
public int getUnackedMessages() {
return unackedMessages;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
index 749cade53d6..cb515cdbd97 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentSubscription.java
@@ -29,7 +29,6 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -49,7 +48,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NonPersistentSubscription extends AbstractSubscription implements Subscription {
+public class NonPersistentSubscription implements Subscription {
private final NonPersistentTopic topic;
private volatile NonPersistentDispatcher dispatcher;
private final String topicName;
@@ -67,6 +66,9 @@ public class NonPersistentSubscription extends AbstractSubscription implements S
// Timestamp of when this subscription was last seen active
private volatile long lastActive;
+ private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
+ private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
+
// If isDurable is false(such as a Reader), remove subscription from topic when closing this subscription.
private final boolean isDurable;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 0c2823a131a..3846fc129b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -34,6 +34,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
@@ -101,6 +102,9 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
+ private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
+ private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
+
private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index 93ad2e80a64..b119f3ca5d2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
@@ -49,7 +50,6 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
-import org.apache.pulsar.broker.service.AbstractSubscription;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -80,7 +80,7 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PersistentSubscription extends AbstractSubscription implements Subscription {
+public class PersistentSubscription implements Subscription {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Dispatcher dispatcher;
@@ -113,6 +113,9 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile ReplicatedSubscriptionSnapshotCache replicatedSubscriptionSnapshotCache;
private final PendingAckHandle pendingAckHandle;
+ private final LongAdder bytesOutFromRemovedConsumers = new LongAdder();
+ private final LongAdder msgOutFromRemovedConsumer = new LongAdder();
+
static {
REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES.put(REPLICATED_SUBSCRIPTION_PROPERTY, 1L);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 0933df87d87..129333581ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -44,6 +44,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import lombok.Getter;
@@ -221,6 +222,9 @@ public class PersistentTopic extends AbstractTopic
@Getter
protected final TransactionBuffer transactionBuffer;
+ private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
+ private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
+
// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
private long lastDataMessagePublishedTimestamp = 0;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
deleted file mode 100644
index aaf1f6164a3..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractSubscriptionTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import java.util.List;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class AbstractSubscriptionTest {
- private Consumer consumer;
- private AbstractSubscription subscription;
-
- @BeforeMethod
- public void beforeMethod() {
- Dispatcher dispatcher = mock(Dispatcher.class);
- consumer = mock(Consumer.class);
- subscription = spy(AbstractSubscription.class);
-
- when(subscription.getDispatcher()).thenReturn(dispatcher);
- when(dispatcher.getConsumers()).thenReturn(List.of(consumer));
- }
-
- @Test
- public void testGetMsgOutCounter() {
- subscription.msgOutFromRemovedConsumer.add(1L);
- when(consumer.getMsgOutCounter()).thenReturn(2L);
- assertEquals(subscription.getMsgOutCounter(), 3L);
- }
-
- @Test
- public void testGetBytesOutCounter() {
- subscription.bytesOutFromRemovedConsumers.add(1L);
- when(consumer.getBytesOutCounter()).thenReturn(2L);
- assertEquals(subscription.getBytesOutCounter(), 3L);
- }
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
deleted file mode 100644
index fb7890dc57f..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractTopicTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.withSettings;
-import static org.testng.Assert.assertEquals;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class AbstractTopicTest {
- private AbstractSubscription subscription;
- private AbstractTopic topic;
-
- @BeforeMethod
- public void beforeMethod() {
- BrokerService brokerService = mock(BrokerService.class);
- PulsarService pulsarService = mock(PulsarService.class);
- ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
- BacklogQuotaManager backlogQuotaManager = mock(BacklogQuotaManager.class);
- subscription = mock(AbstractSubscription.class);
-
- when(brokerService.pulsar()).thenReturn(pulsarService);
- when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
- when(brokerService.getBacklogQuotaManager()).thenReturn(backlogQuotaManager);
-
- topic = mock(AbstractTopic.class, withSettings()
- .useConstructor("topic", brokerService)
- .defaultAnswer(CALLS_REAL_METHODS));
-
- ConcurrentOpenHashMap<String, Subscription> subscriptions =
- ConcurrentOpenHashMap.<String, Subscription>newBuilder()
- .expectedItems(16)
- .concurrencyLevel(1)
- .build();
- subscriptions.put("subscription", subscription);
- when(topic.getSubscriptions()).thenAnswer(invocation -> subscriptions);
- }
-
- @Test
- public void testGetMsgOutCounter() {
- topic.msgOutFromRemovedSubscriptions.add(1L);
- when(subscription.getMsgOutCounter()).thenReturn(2L);
- assertEquals(topic.getMsgOutCounter(), 3L);
- }
-
- @Test
- public void testGetBytesOutCounter() {
- topic.bytesOutFromRemovedSubscriptions.add(1L);
- when(subscription.getBytesOutCounter()).thenReturn(2L);
- assertEquals(topic.getBytesOutCounter(), 3L);
- }
-}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
deleted file mode 100644
index 1ad0642d9c5..00000000000
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ConsumerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.broker.service;
-
-import static java.util.Collections.emptyMap;
-import static org.apache.pulsar.client.api.MessageId.latest;
-import static org.apache.pulsar.common.api.proto.CommandSubscribe.SubType.Exclusive;
-import static org.apache.pulsar.common.api.proto.KeySharedMode.AUTO_SPLIT;
-import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import java.net.SocketAddress;
-import org.apache.pulsar.broker.PulsarService;
-import org.apache.pulsar.broker.ServiceConfiguration;
-import org.apache.pulsar.common.api.proto.KeySharedMeta;
-import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-@Test(groups = "broker")
-public class ConsumerTest {
- private Consumer consumer;
- private final ConsumerStatsImpl stats = new ConsumerStatsImpl();
-
- @BeforeMethod
- public void beforeMethod() {
- Subscription subscription = mock(Subscription.class);
- ServerCnx cnx = mock(ServerCnx.class);
- SocketAddress address = mock(SocketAddress.class);
- Topic topic = mock(Topic.class);
- BrokerService brokerService = mock(BrokerService.class);
- PulsarService pulsarService = mock(PulsarService.class);
- ServiceConfiguration serviceConfiguration = mock(ServiceConfiguration.class);
-
- when(cnx.clientAddress()).thenReturn(address);
- when(subscription.getTopic()).thenReturn(topic);
- when(topic.getBrokerService()).thenReturn(brokerService);
- when(brokerService.getPulsar()).thenReturn(pulsarService);
- when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
-
- consumer =
- new Consumer(subscription, Exclusive, "topic", 1, 0, "Cons1", true, cnx, "myrole-1", emptyMap(), false,
- new KeySharedMeta().setKeySharedMode(AUTO_SPLIT), latest, DEFAULT_CONSUMER_EPOCH);
- }
-
- @Test
- public void testGetMsgOutCounter() {
- stats.msgOutCounter = 1L;
- consumer.updateStats(stats);
- assertEquals(consumer.getMsgOutCounter(), 1L);
- }
-
- @Test
- public void testGetBytesOutCounter() {
- stats.bytesOutCounter = 1L;
- consumer.updateStats(stats);
- assertEquals(consumer.getBytesOutCounter(), 1L);
- }
-}