You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/02/01 09:32:29 UTC

[rocketmq] branch develop updated: [ISSUE #2622] Change variable name 'lockTreeMap' to 'treeMapLock' (#2624)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e297221  [ISSUE #2622] Change variable name 'lockTreeMap' to 'treeMapLock' (#2624)
e297221 is described below

commit e297221d5066bfeaeb0201f48f5beeb20b638b4b
Author: 赵延 <10...@qq.com>
AuthorDate: Mon Feb 1 17:32:02 2021 +0800

    [ISSUE #2622] Change variable name 'lockTreeMap' to 'treeMapLock' (#2624)
---
 .../apache/rocketmq/broker/out/BrokerOuterAPI.java |  2 +-
 .../rocketmq/broker/topic/TopicConfigManager.java  | 14 +++---
 .../consumer/ConsumeMessageOrderlyService.java     |  4 +-
 .../client/impl/consumer/ProcessQueue.java         | 56 +++++++++++-----------
 .../client/impl/consumer/RebalancePushImpl.java    |  4 +-
 .../remoting/netty/NettyRemotingClient.java        |  6 +--
 6 files changed, 43 insertions(+), 43 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
index 8b01ef5..de7f3fc 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
@@ -146,7 +146,7 @@ public class BrokerOuterAPI {
                     @Override
                     public void run() {
                         try {
-                            RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
+                            RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);
                             if (result != null) {
                                 registerBrokerResultList.add(result);
                             }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 86f6065..99ba1da 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -45,7 +45,7 @@ public class TopicConfigManager extends ConfigManager {
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
     private static final int SCHEDULE_TOPIC_QUEUE_NUM = 18;
 
-    private transient final Lock lockTopicConfigTable = new ReentrantLock();
+    private transient final Lock topicConfigTableLock = new ReentrantLock();
 
     private final ConcurrentMap<String, TopicConfig> topicConfigTable =
         new ConcurrentHashMap<String, TopicConfig>(1024);
@@ -159,7 +159,7 @@ public class TopicConfigManager extends ConfigManager {
         boolean createNew = false;
 
         try {
-            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
                     topicConfig = this.topicConfigTable.get(topic);
                     if (topicConfig != null)
@@ -213,7 +213,7 @@ public class TopicConfigManager extends ConfigManager {
                         this.persist();
                     }
                 } finally {
-                    this.lockTopicConfigTable.unlock();
+                    this.topicConfigTableLock.unlock();
                 }
             }
         } catch (InterruptedException e) {
@@ -239,7 +239,7 @@ public class TopicConfigManager extends ConfigManager {
         boolean createNew = false;
 
         try {
-            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
                     topicConfig = this.topicConfigTable.get(topic);
                     if (topicConfig != null)
@@ -257,7 +257,7 @@ public class TopicConfigManager extends ConfigManager {
                     this.dataVersion.nextVersion();
                     this.persist();
                 } finally {
-                    this.lockTopicConfigTable.unlock();
+                    this.topicConfigTableLock.unlock();
                 }
             }
         } catch (InterruptedException e) {
@@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager {
         boolean createNew = false;
 
         try {
-            if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+            if (this.topicConfigTableLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
                     topicConfig = this.topicConfigTable.get(TopicValidator.RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC);
                     if (topicConfig != null)
@@ -297,7 +297,7 @@ public class TopicConfigManager extends ConfigManager {
                     this.dataVersion.nextVersion();
                     this.persist();
                 } finally {
-                    this.lockTopicConfigTable.unlock();
+                    this.topicConfigTableLock.unlock();
                 }
             }
         } catch (InterruptedException e) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index a171098..130effa 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -478,7 +478,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                             ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
                             boolean hasException = false;
                             try {
-                                this.processQueue.getLockConsume().lock();
+                                this.processQueue.getConsumeLock().lock();
                                 if (this.processQueue.isDropped()) {
                                     log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
                                         this.messageQueue);
@@ -494,7 +494,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
                                     messageQueue);
                                 hasException = true;
                             } finally {
-                                this.processQueue.getLockConsume().unlock();
+                                this.processQueue.getConsumeLock().unlock();
                             }
 
                             if (null == status
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
index 4b9ea62..21798d8 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
@@ -44,11 +44,11 @@ public class ProcessQueue {
     public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
     private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
     private final InternalLogger log = ClientLogger.getLog();
-    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
+    private final ReadWriteLock treeMapLock = new ReentrantReadWriteLock();
     private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
     private final AtomicLong msgCount = new AtomicLong();
     private final AtomicLong msgSize = new AtomicLong();
-    private final Lock lockConsume = new ReentrantLock();
+    private final Lock consumeLock = new ReentrantLock();
     /**
      * A subset of msgTreeMap, will only be used when orderly consume
      */
@@ -83,7 +83,7 @@ public class ProcessQueue {
         for (int i = 0; i < loop; i++) {
             MessageExt msg = null;
             try {
-                this.lockTreeMap.readLock().lockInterruptibly();
+                this.treeMapLock.readLock().lockInterruptibly();
                 try {
                     if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msgTreeMap.firstEntry().getValue())) > pushConsumer.getConsumeTimeout() * 60 * 1000) {
                         msg = msgTreeMap.firstEntry().getValue();
@@ -92,7 +92,7 @@ public class ProcessQueue {
                         break;
                     }
                 } finally {
-                    this.lockTreeMap.readLock().unlock();
+                    this.treeMapLock.readLock().unlock();
                 }
             } catch (InterruptedException e) {
                 log.error("getExpiredMsg exception", e);
@@ -103,7 +103,7 @@ public class ProcessQueue {
                 pushConsumer.sendMessageBack(msg, 3);
                 log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
                 try {
-                    this.lockTreeMap.writeLock().lockInterruptibly();
+                    this.treeMapLock.writeLock().lockInterruptibly();
                     try {
                         if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) {
                             try {
@@ -113,7 +113,7 @@ public class ProcessQueue {
                             }
                         }
                     } finally {
-                        this.lockTreeMap.writeLock().unlock();
+                        this.treeMapLock.writeLock().unlock();
                     }
                 } catch (InterruptedException e) {
                     log.error("getExpiredMsg exception", e);
@@ -127,7 +127,7 @@ public class ProcessQueue {
     public boolean putMessage(final List<MessageExt> msgs) {
         boolean dispatchToConsume = false;
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             try {
                 int validMsgCnt = 0;
                 for (MessageExt msg : msgs) {
@@ -156,7 +156,7 @@ public class ProcessQueue {
                     }
                 }
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("putMessage exception", e);
@@ -167,13 +167,13 @@ public class ProcessQueue {
 
     public long getMaxSpan() {
         try {
-            this.lockTreeMap.readLock().lockInterruptibly();
+            this.treeMapLock.readLock().lockInterruptibly();
             try {
                 if (!this.msgTreeMap.isEmpty()) {
                     return this.msgTreeMap.lastKey() - this.msgTreeMap.firstKey();
                 }
             } finally {
-                this.lockTreeMap.readLock().unlock();
+                this.treeMapLock.readLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("getMaxSpan exception", e);
@@ -186,7 +186,7 @@ public class ProcessQueue {
         long result = -1;
         final long now = System.currentTimeMillis();
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             this.lastConsumeTimestamp = now;
             try {
                 if (!msgTreeMap.isEmpty()) {
@@ -206,7 +206,7 @@ public class ProcessQueue {
                     }
                 }
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (Throwable t) {
             log.error("removeMessage exception", t);
@@ -245,12 +245,12 @@ public class ProcessQueue {
 
     public void rollback() {
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             try {
                 this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap);
                 this.consumingMsgOrderlyTreeMap.clear();
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("rollback exception", e);
@@ -259,7 +259,7 @@ public class ProcessQueue {
 
     public long commit() {
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             try {
                 Long offset = this.consumingMsgOrderlyTreeMap.lastKey();
                 msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size());
@@ -271,7 +271,7 @@ public class ProcessQueue {
                     return offset + 1;
                 }
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("commit exception", e);
@@ -282,14 +282,14 @@ public class ProcessQueue {
 
     public void makeMessageToConsumeAgain(List<MessageExt> msgs) {
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             try {
                 for (MessageExt msg : msgs) {
                     this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset());
                     this.msgTreeMap.put(msg.getQueueOffset(), msg);
                 }
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("makeMessageToCosumeAgain exception", e);
@@ -300,7 +300,7 @@ public class ProcessQueue {
         List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
         final long now = System.currentTimeMillis();
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             this.lastConsumeTimestamp = now;
             try {
                 if (!this.msgTreeMap.isEmpty()) {
@@ -319,7 +319,7 @@ public class ProcessQueue {
                     consuming = false;
                 }
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("take Messages exception", e);
@@ -330,11 +330,11 @@ public class ProcessQueue {
 
     public boolean hasTempMessage() {
         try {
-            this.lockTreeMap.readLock().lockInterruptibly();
+            this.treeMapLock.readLock().lockInterruptibly();
             try {
                 return !this.msgTreeMap.isEmpty();
             } finally {
-                this.lockTreeMap.readLock().unlock();
+                this.treeMapLock.readLock().unlock();
             }
         } catch (InterruptedException e) {
         }
@@ -344,7 +344,7 @@ public class ProcessQueue {
 
     public void clear() {
         try {
-            this.lockTreeMap.writeLock().lockInterruptibly();
+            this.treeMapLock.writeLock().lockInterruptibly();
             try {
                 this.msgTreeMap.clear();
                 this.consumingMsgOrderlyTreeMap.clear();
@@ -352,7 +352,7 @@ public class ProcessQueue {
                 this.msgSize.set(0);
                 this.queueOffsetMax = 0L;
             } finally {
-                this.lockTreeMap.writeLock().unlock();
+                this.treeMapLock.writeLock().unlock();
             }
         } catch (InterruptedException e) {
             log.error("rollback exception", e);
@@ -367,8 +367,8 @@ public class ProcessQueue {
         this.lastLockTimestamp = lastLockTimestamp;
     }
 
-    public Lock getLockConsume() {
-        return lockConsume;
+    public Lock getConsumeLock() {
+        return consumeLock;
     }
 
     public long getLastPullTimestamp() {
@@ -397,7 +397,7 @@ public class ProcessQueue {
 
     public void fillProcessQueueInfo(final ProcessQueueInfo info) {
         try {
-            this.lockTreeMap.readLock().lockInterruptibly();
+            this.treeMapLock.readLock().lockInterruptibly();
 
             if (!this.msgTreeMap.isEmpty()) {
                 info.setCachedMsgMinOffset(this.msgTreeMap.firstKey());
@@ -421,7 +421,7 @@ public class ProcessQueue {
             info.setLastConsumeTimestamp(this.lastConsumeTimestamp);
         } catch (Exception e) {
         } finally {
-            this.lockTreeMap.readLock().unlock();
+            this.treeMapLock.readLock().unlock();
         }
     }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f3..9582391 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -88,11 +88,11 @@ public class RebalancePushImpl extends RebalanceImpl {
         if (this.defaultMQPushConsumerImpl.isConsumeOrderly()
             && MessageModel.CLUSTERING.equals(this.defaultMQPushConsumerImpl.messageModel())) {
             try {
-                if (pq.getLockConsume().tryLock(1000, TimeUnit.MILLISECONDS)) {
+                if (pq.getConsumeLock().tryLock(1000, TimeUnit.MILLISECONDS)) {
                     try {
                         return this.unlockDelay(mq, pq);
                     } finally {
-                        pq.getLockConsume().unlock();
+                        pq.getConsumeLock().unlock();
                     }
                 } else {
                     log.warn("[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}",
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index d836c8e..5ba6cfa 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -84,7 +84,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
     private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
     private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
     private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
-    private final Lock lockNamesrvChannel = new ReentrantLock();
+    private final Lock namesrvChannelLock = new ReentrantLock();
 
     private final ExecutorService publicExecutor;
 
@@ -418,7 +418,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
         }
 
         final List<String> addrList = this.namesrvAddrList.get();
-        if (this.lockNamesrvChannel.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+        if (this.namesrvChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
             try {
                 addr = this.namesrvAddrChoosed.get();
                 if (addr != null) {
@@ -445,7 +445,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
                     throw new RemotingConnectException(addrList.toString());
                 }
             } finally {
-                this.lockNamesrvChannel.unlock();
+                this.namesrvChannelLock.unlock();
             }
         } else {
             log.warn("getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);