You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/09/08 12:50:55 UTC
[rocketmq] branch develop updated: [ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent performance on jdk1.8 a workaround (#5008)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new ed9c775ee [ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent performance on jdk1.8 a workaround (#5008)
ed9c775ee is described below
commit ed9c775ee371eb6cbca9915f98f44f63cccc7f8b
Author: mxsm <lj...@gmail.com>
AuthorDate: Thu Sep 8 20:50:32 2022 +0800
[ISSUE #4999] Optimize ConcurrentMap#computeIfAbsent performance on jdk1.8 a workaround (#5008)
---
.../common/utils/ConcurrentHashMapUtils.java | 21 ++++---
.../common/utils/ConcurrentHashMapUtilsTest.java | 39 +++++++++++++
.../namesrv/routeinfo/RouteInfoManager.java | 65 +++++++++++-----------
.../rocketmq/proxy/common/ReceiptHandleGroup.java | 4 +-
.../proxy/processor/ReceiptHandleProcessor.java | 3 +-
.../proxy/service/channel/ChannelManager.java | 4 +-
.../store/ha/autoswitch/AutoSwitchHAService.java | 3 +-
.../rocketmq/store/queue/QueueOffsetAssigner.java | 8 +--
8 files changed, 95 insertions(+), 52 deletions(-)
diff --git a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
similarity index 77%
rename from srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
rename to common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
index cc98eb5bc..b994276c9 100644
--- a/srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtils.java
@@ -14,23 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.srvutil;
+package org.apache.rocketmq.common.utils;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
-public class ConcurrentHashMapUtil {
+public abstract class ConcurrentHashMapUtils {
+
private static final boolean IS_JDK8;
static {
- // Java 8 or lower: 1.6.0_23, 1.7.0, 1.7.0_80, 1.8.0_211
- // Java 9 or higher: 9.0.1, 11.0.4, 12, 12.0.1
+ // Java 8
+ // Java 9+: 9,11,17
IS_JDK8 = System.getProperty("java.version").startsWith("1.8.");
}
- private ConcurrentHashMapUtil() {
- }
-
/**
* A temporary workaround for Java 8 specific performance issue JDK-8161372 .<br> Use implementation of
* ConcurrentMap.computeIfAbsent instead.
@@ -39,10 +37,11 @@ public class ConcurrentHashMapUtil {
*/
public static <K, V> V computeIfAbsent(ConcurrentMap<K, V> map, K key, Function<? super K, ? extends V> func) {
if (IS_JDK8) {
- V v, newValue;
- return ((v = map.get(key)) == null &&
- (newValue = func.apply(key)) != null &&
- (v = map.putIfAbsent(key, newValue)) == null) ? newValue : v;
+ V v = map.get(key);
+ if (null == v) {
+ v = map.computeIfAbsent(key, func);
+ }
+ return v;
} else {
return map.computeIfAbsent(key, func);
}
diff --git a/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
new file mode 100644
index 000000000..89a4b0cda
--- /dev/null
+++ b/common/src/test/java/org/apache/rocketmq/common/utils/ConcurrentHashMapUtilsTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.concurrent.ConcurrentHashMap;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class ConcurrentHashMapUtilsTest {
+
+ @Test
+ public void computeIfAbsent() {
+
+ ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();
+ map.put("123", "1111");
+ String value = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "234");
+ assertEquals("1111", value);
+ String value1 = ConcurrentHashMapUtils.computeIfAbsent(map, "1232", k -> "2342");
+ assertEquals("2342", value1);
+ String value2 = ConcurrentHashMapUtils.computeIfAbsent(map, "123", k -> "2342");
+ assertEquals("1111", value2);
+ }
+}
\ No newline at end of file
diff --git a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
index ace963ca9..91ec4ca1f 100644
--- a/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
+++ b/namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.body.TopicConfigAndMappingSerializeWrapper;
import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
@@ -201,16 +202,16 @@ public class RouteInfoManager {
}
public RegisterBrokerResult registerBroker(
- final String clusterName,
- final String brokerAddr,
- final String brokerName,
- final long brokerId,
- final String haServerAddr,
- final String zoneName,
- final Long timeoutMillis,
- final TopicConfigSerializeWrapper topicConfigWrapper,
- final List<String> filterServerList,
- final Channel channel) {
+ final String clusterName,
+ final String brokerAddr,
+ final String brokerName,
+ final long brokerId,
+ final String haServerAddr,
+ final String zoneName,
+ final Long timeoutMillis,
+ final TopicConfigSerializeWrapper topicConfigWrapper,
+ final List<String> filterServerList,
+ final Channel channel) {
return registerBroker(clusterName, brokerAddr, brokerName, brokerId, haServerAddr, zoneName, timeoutMillis, false, topicConfigWrapper, filterServerList, channel);
}
@@ -231,7 +232,7 @@ public class RouteInfoManager {
this.lock.writeLock().lockInterruptibly();
//init or update the cluster info
- Set<String> brokerNames = this.clusterAddrTable.computeIfAbsent(clusterName, k -> new HashSet<>());
+ Set<String> brokerNames = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Set<String>>) this.clusterAddrTable, clusterName, k -> new HashSet<>());
brokerNames.add(brokerName);
boolean registerFirst = false;
@@ -273,8 +274,8 @@ public class RouteInfoManager {
long newStateVersion = topicConfigWrapper.getDataVersion().getStateVersion();
if (oldStateVersion > newStateVersion) {
log.warn("Registered Broker conflicts with the existed one, just ignore.: Cluster:{}, BrokerName:{}, BrokerId:{}, " +
- "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
- clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
+ "Old BrokerAddr:{}, Old Version:{}, New BrokerAddr:{}, New Version:{}.",
+ clusterName, brokerName, brokerId, oldBrokerAddr, oldStateVersion, brokerAddr, newStateVersion);
//Remove the rejected brokerAddr from brokerLiveTable.
brokerLiveTable.remove(new BrokerAddrInfo(clusterName, brokerAddr));
return result;
@@ -284,7 +285,7 @@ public class RouteInfoManager {
if (!brokerAddrsMap.containsKey(brokerId) && topicConfigWrapper.getTopicConfigTable().size() == 1) {
log.warn("Can't register topicConfigWrapper={} because broker[{}]={} has not registered.",
- topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
+ topicConfigWrapper.getTopicConfigTable(), brokerId, brokerAddr);
return null;
}
@@ -293,17 +294,17 @@ public class RouteInfoManager {
boolean isMaster = MixAll.MASTER_ID == brokerId;
boolean isPrimeSlave = !isOldVersionBroker && !isMaster
- && brokerId == Collections.min(brokerAddrsMap.keySet());
+ && brokerId == Collections.min(brokerAddrsMap.keySet());
if (null != topicConfigWrapper && (isMaster || isPrimeSlave)) {
ConcurrentMap<String, TopicConfig> tcTable =
- topicConfigWrapper.getTopicConfigTable();
+ topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
if (registerFirst || this.isTopicConfigChanged(clusterName, brokerAddr,
- topicConfigWrapper.getDataVersion(), brokerName,
- entry.getValue().getTopicName())) {
+ topicConfigWrapper.getDataVersion(), brokerName,
+ entry.getValue().getTopicName())) {
final TopicConfig topicConfig = entry.getValue();
if (isPrimeSlave) {
// Wipe write perm for prime slave
@@ -331,12 +332,12 @@ public class RouteInfoManager {
BrokerAddrInfo brokerAddrInfo = new BrokerAddrInfo(clusterName, brokerAddr);
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddrInfo,
- new BrokerLiveInfo(
- System.currentTimeMillis(),
- timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
- topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
- channel,
- haServerAddr));
+ new BrokerLiveInfo(
+ System.currentTimeMillis(),
+ timeoutMillis == null ? DEFAULT_BROKER_CHANNEL_EXPIRED_TIME : timeoutMillis,
+ topicConfigWrapper == null ? new DataVersion() : topicConfigWrapper.getDataVersion(),
+ channel,
+ haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAService: {}", brokerAddrInfo, haServerAddr);
}
@@ -363,7 +364,7 @@ public class RouteInfoManager {
if (isMinBrokerIdChanged && namesrvConfig.isNotifyMinBrokerIdChanged()) {
notifyMinBrokerIdChanged(brokerAddrsMap, null,
- this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
+ this.brokerLiveTable.get(brokerAddrInfo).getHaServerAddr());
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
@@ -679,9 +680,9 @@ public class RouteInfoManager {
continue;
}
BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(),
- brokerData.getBrokerName(),
- (HashMap<Long, String>) brokerData.getBrokerAddrs().clone(),
- brokerData.isEnableActingMaster(), brokerData.getZoneName());
+ brokerData.getBrokerName(),
+ (HashMap<Long, String>) brokerData.getBrokerAddrs().clone(),
+ brokerData.isEnableActingMaster(), brokerData.getZoneName());
brokerDataList.add(brokerDataClone);
foundBrokerData = true;
@@ -1023,7 +1024,7 @@ public class RouteInfoManager {
String topic = topicEntry.getKey();
Map<String, QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- && TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
+ && TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
topicList.getTopicList().add(topic);
}
}
@@ -1044,7 +1045,7 @@ public class RouteInfoManager {
String topic = topicEntry.getKey();
Map<String, QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
+ && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
topicList.getTopicList().add(topic);
}
}
@@ -1065,8 +1066,8 @@ public class RouteInfoManager {
String topic = topicEntry.getKey();
Map<String, QueueData> queueDatas = topicEntry.getValue();
if (queueDatas != null && queueDatas.size() > 0
- && !TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
- && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
+ && !TopicSysFlag.hasUnitFlag(queueDatas.values().iterator().next().getTopicSysFlag())
+ && TopicSysFlag.hasUnitSubFlag(queueDatas.values().iterator().next().getTopicSysFlag())) {
topicList.getTopicList().add(topic);
}
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
index d2f447273..07d32445f 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/common/ReceiptHandleGroup.java
@@ -26,6 +26,7 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.proxy.config.ConfigurationManager;
public class ReceiptHandleGroup {
@@ -74,7 +75,8 @@ public class ReceiptHandleGroup {
public void put(String msgID, String handle, MessageReceiptHandle value) {
long timeout = ConfigurationManager.getProxyConfig().getLockTimeoutMsInHandleGroup();
- Map<String, HandleData> handleMap = receiptHandleMap.computeIfAbsent(msgID, msgIDKey -> new ConcurrentHashMap<>());
+ Map<String, HandleData> handleMap = ConcurrentHashMapUtils.computeIfAbsent((ConcurrentHashMap<String, Map<String, HandleData>>) this.receiptHandleMap,
+ msgID, msgIDKey -> new ConcurrentHashMap<>());
handleMap.compute(handle, (handleKey, handleData) -> {
if (handleData == null || handleData.needRemove) {
return new HandleData(value);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
index 9bb8b7f9f..15c4385fd 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessor.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.common.consumer.ReceiptHandle;
import org.apache.rocketmq.common.subscription.RetryPolicy;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.proxy.common.AbstractStartAndShutdown;
import org.apache.rocketmq.proxy.common.MessageReceiptHandle;
import org.apache.rocketmq.proxy.common.ProxyContext;
@@ -239,7 +240,7 @@ public class ReceiptHandleProcessor extends AbstractStartAndShutdown {
if (key == null) {
return;
}
- receiptHandleGroupMap.computeIfAbsent(key,
+ ConcurrentHashMapUtils.computeIfAbsent(this.receiptHandleGroupMap, key,
k -> new ReceiptHandleGroup()).put(msgID, receiptHandle, messageReceiptHandle);
}
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
index d730d9118..283cd823d 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/channel/ChannelManager.java
@@ -23,6 +23,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +38,7 @@ public class ChannelManager {
log.warn("ClientId is unexpected null or empty");
return createChannelInner(context);
}
-
- SimpleChannel channel = clientIdChannelMap.computeIfAbsent(clientId, k -> createChannelInner(context));
+ SimpleChannel channel = ConcurrentHashMapUtils.computeIfAbsent(this.clientIdChannelMap,clientId, k -> createChannelInner(context));
channel.updateLastAccessTime();
return channel;
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
index 74de4d691..49794c28a 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAService.java
@@ -33,6 +33,7 @@ import org.apache.rocketmq.common.EpochEntry;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.body.HARuntimeInfo;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -237,7 +238,7 @@ public class AutoSwitchHAService extends DefaultHAService {
}
public void updateConnectionLastCaughtUpTime(final String slaveAddress, final long lastCaughtUpTimeMs) {
- long prevTime = this.connectionCaughtUpTimeTable.computeIfAbsent(slaveAddress, k -> 0L);
+ Long prevTime = ConcurrentHashMapUtils.computeIfAbsent(this.connectionCaughtUpTimeTable, slaveAddress, k -> 0L);
this.connectionCaughtUpTimeTable.put(slaveAddress, Math.max(prevTime, lastCaughtUpTimeMs));
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index 55614ccba..5e87bbc03 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -20,12 +20,12 @@ package org.apache.rocketmq.store.queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* QueueOffsetAssigner is a component for assigning offsets for queues.
- *
*/
public class QueueOffsetAssigner {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
@@ -35,7 +35,7 @@ public class QueueOffsetAssigner {
private ConcurrentMap<String/* topic-queueid */, Long/* offset */> lmqTopicQueueTable = new ConcurrentHashMap<>(1024);
public long assignQueueOffset(String topicQueueKey, short messageNum) {
- long queueOffset = this.topicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ Long queueOffset = ConcurrentHashMapUtils.computeIfAbsent(this.topicQueueTable, topicQueueKey, k -> 0L);
this.topicQueueTable.put(topicQueueKey, queueOffset + messageNum);
return queueOffset;
}
@@ -45,13 +45,13 @@ public class QueueOffsetAssigner {
}
public long assignBatchQueueOffset(String topicQueueKey, short messageNum) {
- Long topicOffset = this.batchTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ Long topicOffset = ConcurrentHashMapUtils.computeIfAbsent(this.batchTopicQueueTable, topicQueueKey, k -> 0L);
this.batchTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
return topicOffset;
}
public long assignLmqOffset(String topicQueueKey, short messageNum) {
- long topicOffset = this.lmqTopicQueueTable.computeIfAbsent(topicQueueKey, k -> 0L);
+ Long topicOffset = ConcurrentHashMapUtils.computeIfAbsent(this.lmqTopicQueueTable, topicQueueKey, k -> 0L);
this.lmqTopicQueueTable.put(topicQueueKey, topicOffset + messageNum);
return topicOffset;
}