You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/02 07:52:41 UTC
[rocketmq] branch develop updated: optimizing: For a big set, replace sequential iteration addition with a parallel stream when calculating results. (#3540)
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5d5d5f5 optimizing: For a big set, replace sequential iteration addition with a parallel stream when calculating results. (#3540)
5d5d5f5 is described below
commit 5d5d5f5f4c43d8cb2a45aee8cd0ad758902d348b
Author: 彭小漪 <64...@qq.com>
AuthorDate: Thu Dec 2 15:52:30 2021 +0800
optimizing: For a big set, replace sequential iteration addition with a parallel stream when calculating results. (#3540)
refactor: Renaming variables lockXXX to xxxLock may be better.
---
.../apache/rocketmq/store/StoreStatsService.java | 58 +++++++++++-----------
1 file changed, 29 insertions(+), 29 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
index 395f5e3..e8e7e04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
@@ -62,13 +62,13 @@ public class StoreStatsService extends ServiceThread {
private volatile long putMessageEntireTimeMax = 0;
private volatile long getMessageEntireTimeMax = 0;
// for putMessageEntireTimeMax
- private ReentrantLock lockPut = new ReentrantLock();
+ private ReentrantLock putLock = new ReentrantLock();
// for getMessageEntireTimeMax
- private ReentrantLock lockGet = new ReentrantLock();
+ private ReentrantLock getLock = new ReentrantLock();
private volatile long dispatchMaxBuffer = 0;
- private ReentrantLock lockSampling = new ReentrantLock();
+ private ReentrantLock samplingLock = new ReentrantLock();
private long lastPrintTimestamp = System.currentTimeMillis();
public StoreStatsService() {
@@ -138,10 +138,10 @@ public class StoreStatsService extends ServiceThread {
}
if (value > this.putMessageEntireTimeMax) {
- this.lockPut.lock();
+ this.putLock.lock();
this.putMessageEntireTimeMax =
value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax;
- this.lockPut.unlock();
+ this.putLock.unlock();
}
}
@@ -151,10 +151,10 @@ public class StoreStatsService extends ServiceThread {
public void setGetMessageEntireTimeMax(long value) {
if (value > this.getMessageEntireTimeMax) {
- this.lockGet.lock();
+ this.getLock.lock();
this.getMessageEntireTimeMax =
value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax;
- this.lockGet.unlock();
+ this.getLock.unlock();
}
}
@@ -193,11 +193,11 @@ public class StoreStatsService extends ServiceThread {
}
public long getPutMessageTimesTotal() {
- long rs = 0;
- for (LongAdder data : putMessageTopicTimesTotal.values()) {
- rs += data.longValue();
- }
- return rs;
+ Map<String, LongAdder> map = putMessageTopicTimesTotal;
+ return map.values()
+ .parallelStream()
+ .mapToLong(LongAdder::longValue)
+ .sum();
}
private String getFormatRuntime() {
@@ -217,11 +217,11 @@ public class StoreStatsService extends ServiceThread {
}
public long getPutMessageSizeTotal() {
- long rs = 0;
- for (LongAdder data : putMessageTopicSizeTotal.values()) {
- rs += data.longValue();
- }
- return rs;
+ Map<String, LongAdder> map = putMessageTopicSizeTotal;
+ return map.values()
+ .parallelStream()
+ .mapToLong(LongAdder::longValue)
+ .sum();
}
private String getPutMessageDistributeTimeStringInfo(Long total) {
@@ -315,7 +315,7 @@ public class StoreStatsService extends ServiceThread {
private String getPutTps(int time) {
String result = "";
- this.lockSampling.lock();
+ this.samplingLock.lock();
try {
CallSnapshot last = this.putTimesList.getLast();
@@ -325,14 +325,14 @@ public class StoreStatsService extends ServiceThread {
}
} finally {
- this.lockSampling.unlock();
+ this.samplingLock.unlock();
}
return result;
}
private String getGetFoundTps(int time) {
String result = "";
- this.lockSampling.lock();
+ this.samplingLock.lock();
try {
CallSnapshot last = this.getTimesFoundList.getLast();
@@ -342,7 +342,7 @@ public class StoreStatsService extends ServiceThread {
result += CallSnapshot.getTPS(lastBefore, last);
}
} finally {
- this.lockSampling.unlock();
+ this.samplingLock.unlock();
}
return result;
@@ -350,7 +350,7 @@ public class StoreStatsService extends ServiceThread {
private String getGetMissTps(int time) {
String result = "";
- this.lockSampling.lock();
+ this.samplingLock.lock();
try {
CallSnapshot last = this.getTimesMissList.getLast();
@@ -361,14 +361,14 @@ public class StoreStatsService extends ServiceThread {
}
} finally {
- this.lockSampling.unlock();
+ this.samplingLock.unlock();
}
return result;
}
private String getGetTotalTps(int time) {
- this.lockSampling.lock();
+ this.samplingLock.lock();
double found = 0;
double miss = 0;
try {
@@ -392,7 +392,7 @@ public class StoreStatsService extends ServiceThread {
}
} finally {
- this.lockSampling.unlock();
+ this.samplingLock.unlock();
}
return Double.toString(found + miss);
@@ -400,7 +400,7 @@ public class StoreStatsService extends ServiceThread {
private String getGetTransferedTps(int time) {
String result = "";
- this.lockSampling.lock();
+ this.samplingLock.lock();
try {
CallSnapshot last = this.transferedMsgCountList.getLast();
@@ -411,7 +411,7 @@ public class StoreStatsService extends ServiceThread {
}
} finally {
- this.lockSampling.unlock();
+ this.samplingLock.unlock();
}
return result;
@@ -469,7 +469,7 @@ public class StoreStatsService extends ServiceThread {
}
private void sampling() {
- this.lockSampling.lock();
+ this.samplingLock.lock();
try {
this.putTimesList.add(new CallSnapshot(System.currentTimeMillis(), getPutMessageTimesTotal()));
if (this.putTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
@@ -495,7 +495,7 @@ public class StoreStatsService extends ServiceThread {
}
} finally {
- this.lockSampling.unlock();
+ this.samplingLock.unlock();
}
}