You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/02/01 09:32:29 UTC
[rocketmq] branch develop updated: [ISSUE #2622] Change variable
name 'lockTreeMap' to 'treeMapLock' (#2624)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e297221 [ISSUE #2622] Change variable name 'lockTreeMap' to 'treeMapLock' (#2624)
e297221 is described below
commit e297221d5066bfeaeb0201f48f5beeb20b638b4b
Author: 赵延 <10...@qq.com>
AuthorDate: Mon Feb 1 17:32:02 2021 +0800
[ISSUE #2622] Change variable name 'lockTreeMap' to 'treeMapLock' (#2624)
---
.../apache/rocketmq/broker/out/BrokerOuterAPI.java | 2 +-
.../rocketmq/broker/topic/TopicConfigManager.java | 14 +++---
.../consumer/ConsumeMessageOrderlyService.java | 4 +-
.../client/impl/consumer/ProcessQueue.java | 56 +++++++++++-----------
.../client/impl/consumer/RebalancePushImpl.java | 4 +-
.../remoting/netty/NettyRemotingClient.java | 6 +--
6 files changed, 43 insertions(+), 43 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 8b01ef5..de7f3fc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -146,7 +146,7 @@ public class BrokerOuterAPI {
@Override
public void run() {
try {
- RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
+ RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
if (result != null) {
registerBrokerResultList.add(result);
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 86f6065..99ba1da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -45,7 +45,7 @@ public class TopicConfigManager extends ConfigManager {
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
- private transient final Lock lockTopicConfigTable = new ReentrantLock();
+ private transient final Lock topicConfigTableLock = new ReentrantLock();
private final ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(1024);
@@ -159,7 +159,7 @@ public class TopicConfigManager extends ConfigManager {
boolean createNew = false;
try {
- if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
@@ -213,7 +213,7 @@ public class TopicConfigManager extends ConfigManager {
this.persist();
}
} finally {
- this.lockTopicConfigTable.unlock();
+ this.topicConfigTableLock.unlock();
}
}
} catch (InterruptedException e) {
@@ -239,7 +239,7 @@ public class TopicConfigManager extends ConfigManager {
boolean createNew = false;
try {
- if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
@@ -257,7 +257,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
} finally {
- this.lockTopicConfigTable.unlock();
+ this.topicConfigTableLock.unlock();
}
}
} catch (InterruptedException e) {
@@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager {
boolean createNew = false;
try {
- if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
if (topicConfig != null)
@@ -297,7 +297,7 @@ public class TopicConfigManager extends ConfigManager {
this.dataVersion.nextVersion();
this.persist();
} finally {
- this.lockTopicConfigTable.unlock();
+ this.topicConfigTableLock.unlock();
}
}
} catch (InterruptedException e) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index a171098..130effa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -478,7 +478,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
- this.processQueue.getLockConsume().lock();
+ this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
@@ -494,7 +494,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
messageQueue);
hasException = true;
} finally {
- this.processQueue.getLockConsume().unlock();
+ this.processQueue.getConsumeLock().unlock();
}
if (null == status
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 4b9ea62..21798d8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -44,11 +44,11 @@ public class ProcessQueue {
public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
private final InternalLogger log = ClientLogger.getLog();
- private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
+ private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock();
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
private final AtomicLong msgCount = new AtomicLong();
private final AtomicLong msgSize = new AtomicLong();
- private final Lock lockConsume = new ReentrantLock();
+ private final Lock consumeLock = new ReentrantLock();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
@@ -83,7 +83,7 @@ public class ProcessQueue {
for (int i = 0; i < loop; i++) {
MessageExt msg = null;
try {
- this.lockTreeMap.readLock().lockInterruptibly();
+ this.treeMapLock.readLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
msg = msgTreeMap.firstEntry().getValue();
@@ -92,7 +92,7 @@ public class ProcessQueue {
break;
}
} finally {
- this.lockTreeMap.readLock().unlock();
+ this.treeMapLock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
@@ -103,7 +103,7 @@ public class ProcessQueue {
pushConsumer.sendMessageBack(msg, 3);
log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
try {
if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
try {
@@ -113,7 +113,7 @@ public class ProcessQueue {
}
}
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("getExpiredMsg exception", e);
@@ -127,7 +127,7 @@ public class ProcessQueue {
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
@@ -156,7 +156,7 @@ public class ProcessQueue {
}
}
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
@@ -167,13 +167,13 @@ public class ProcessQueue {
public long getMaxSpan() {
try {
- this.lockTreeMap.readLock().lockInterruptibly();
+ this.treeMapLock.readLock().lockInterruptibly();
try {
if (!this.msgTreeMap.isEmpty()) {
return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
}
} finally {
- this.lockTreeMap.readLock().unlock();
+ this.treeMapLock.readLock().unlock();
}
} catch (InterruptedException e) {
log.error("getMaxSpan exception", e);
@@ -186,7 +186,7 @@ public class ProcessQueue {
long result = -1;
final long now = System.currentTimeMillis();
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!msgTreeMap.isEmpty()) {
@@ -206,7 +206,7 @@ public class ProcessQueue {
}
}
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (Throwable t) {
log.error("removeMessage exception", t);
@@ -245,12 +245,12 @@ public class ProcessQueue {
public void rollback() {
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
try {
this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
this.consumingMsgOrderlyTreeMap.clear();
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("rollback exception", e);
@@ -259,7 +259,7 @@ public class ProcessQueue {
public long commit() {
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
try {
Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
@@ -271,7 +271,7 @@ public class ProcessQueue {
return offset + 1;
}
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
@@ -282,14 +282,14 @@ public class ProcessQueue {
public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
try {
for (MessageExt msg : msgs) {
this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
this.msgTreeMap.put(msg.getQueueOffset(), msg);
}
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("makeMessageToCosumeAgain exception", e);
@@ -300,7 +300,7 @@ public class ProcessQueue {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
@@ -319,7 +319,7 @@ public class ProcessQueue {
consuming = false;
}
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
@@ -330,11 +330,11 @@ public class ProcessQueue {
public boolean hasTempMessage() {
try {
- this.lockTreeMap.readLock().lockInterruptibly();
+ this.treeMapLock.readLock().lockInterruptibly();
try {
return !this.msgTreeMap.isEmpty();
} finally {
- this.lockTreeMap.readLock().unlock();
+ this.treeMapLock.readLock().unlock();
}
} catch (InterruptedException e) {
}
@@ -344,7 +344,7 @@ public class ProcessQueue {
public void clear() {
try {
- this.lockTreeMap.writeLock().lockInterruptibly();
+ this.treeMapLock.writeLock().lockInterruptibly();
try {
this.msgTreeMap.clear();
this.consumingMsgOrderlyTreeMap.clear();
@@ -352,7 +352,7 @@ public class ProcessQueue {
this.msgSize.set(0);
this.queueOffsetMax = 0L;
} finally {
- this.lockTreeMap.writeLock().unlock();
+ this.treeMapLock.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("rollback exception", e);
@@ -367,8 +367,8 @@ public class ProcessQueue {
this.lastLockTimestamp = lastLockTimestamp;
}
- public Lock getLockConsume() {
- return lockConsume;
+ public Lock getConsumeLock() {
+ return consumeLock;
}
public long getLastPullTimestamp() {
@@ -397,7 +397,7 @@ public class ProcessQueue {
public void fillProcessQueueInfo(final ProcessQueueInfo info) {
try {
- this.lockTreeMap.readLock().lockInterruptibly();
+ this.treeMapLock.readLock().lockInterruptibly();
if (!this.msgTreeMap.isEmpty()) {
info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
@@ -421,7 +421,7 @@ public class ProcessQueue {
info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
} catch (Exception e) {
} finally {
- this.lockTreeMap.readLock().unlock();
+ this.treeMapLock.readLock().unlock();
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f3..9582391 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -88,11 +88,11 @@ public class RebalancePushImpl extends RebalanceImpl {
if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
&& MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
try {
- if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
+ if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
try {
return this.unlockDelay(mq, pq);
} finally {
- pq.getLockConsume().unlock();
+ pq.getConsumeLock().unlock();
}
} else {
log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index d836c8e..5ba6cfa 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -84,7 +84,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
- private final Lock lockNamesrvChannel = new ReentrantLock();
+ private final Lock namesrvChannelLock = new ReentrantLock();
private final ExecutorService publicExecutor;
@@ -418,7 +418,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
final List<String> addrList = this.namesrvAddrList.get();
- if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+ if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
addr = this.namesrvAddrChoosed.get();
if (addr != null) {
@@ -445,7 +445,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
throw new RemotingConnectException(addrList.toString());
}
} finally {
- this.lockNamesrvChannel.unlock();
+ this.namesrvChannelLock.unlock();
}
} else {
log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);