You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2021/12/02 07:52:41 UTC

[rocketmq] branch develop updated: optimizing: For a big set, replace sequential iteration addition with a parallel stream when calculating results. (#3540)

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5d5d5f5  optimizing: For a big set, replace sequential iteration addition with a parallel stream when calculating results. (#3540)
5d5d5f5 is described below

commit 5d5d5f5f4c43d8cb2a45aee8cd0ad758902d348b
Author: 彭小漪 <64...@qq.com>
AuthorDate: Thu Dec 2 15:52:30 2021 +0800

    optimizing: For a big set, replace sequential iteration addition with a parallel stream when calculating results. (#3540)
    
    refactor: Renaming variables lockXXX to xxxLock may be better.
---
 .../apache/rocketmq/store/StoreStatsService.java   | 58 +++++++++++-----------
 1 file changed, 29 insertions(+), 29 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
index 395f5e3..e8e7e04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
@@ -62,13 +62,13 @@ public class StoreStatsService extends ServiceThread {
     private volatile long putMessageEntireTimeMax = 0;
     private volatile long getMessageEntireTimeMax = 0;
     // for putMessageEntireTimeMax
-    private ReentrantLock lockPut = new ReentrantLock();
+    private ReentrantLock putLock = new ReentrantLock();
     // for getMessageEntireTimeMax
-    private ReentrantLock lockGet = new ReentrantLock();
+    private ReentrantLock getLock = new ReentrantLock();
 
     private volatile long dispatchMaxBuffer = 0;
 
-    private ReentrantLock lockSampling = new ReentrantLock();
+    private ReentrantLock samplingLock = new ReentrantLock();
     private long lastPrintTimestamp = System.currentTimeMillis();
 
     public StoreStatsService() {
@@ -138,10 +138,10 @@ public class StoreStatsService extends ServiceThread {
         }
 
         if (value > this.putMessageEntireTimeMax) {
-            this.lockPut.lock();
+            this.putLock.lock();
             this.putMessageEntireTimeMax =
                 value > this.putMessageEntireTimeMax ? value : this.putMessageEntireTimeMax;
-            this.lockPut.unlock();
+            this.putLock.unlock();
         }
     }
 
@@ -151,10 +151,10 @@ public class StoreStatsService extends ServiceThread {
 
     public void setGetMessageEntireTimeMax(long value) {
         if (value > this.getMessageEntireTimeMax) {
-            this.lockGet.lock();
+            this.getLock.lock();
             this.getMessageEntireTimeMax =
                 value > this.getMessageEntireTimeMax ? value : this.getMessageEntireTimeMax;
-            this.lockGet.unlock();
+            this.getLock.unlock();
         }
     }
 
@@ -193,11 +193,11 @@ public class StoreStatsService extends ServiceThread {
     }
 
     public long getPutMessageTimesTotal() {
-        long rs = 0;
-        for (LongAdder data : putMessageTopicTimesTotal.values()) {
-            rs += data.longValue();
-        }
-        return rs;
+        Map<String, LongAdder> map = putMessageTopicTimesTotal;
+        return map.values()
+                .parallelStream()
+                .mapToLong(LongAdder::longValue)
+                .sum();
     }
 
     private String getFormatRuntime() {
@@ -217,11 +217,11 @@ public class StoreStatsService extends ServiceThread {
     }
 
     public long getPutMessageSizeTotal() {
-        long rs = 0;
-        for (LongAdder data : putMessageTopicSizeTotal.values()) {
-            rs += data.longValue();
-        }
-        return rs;
+        Map<String, LongAdder> map = putMessageTopicSizeTotal;
+        return map.values()
+                .parallelStream()
+                .mapToLong(LongAdder::longValue)
+                .sum();
     }
 
     private String getPutMessageDistributeTimeStringInfo(Long total) {
@@ -315,7 +315,7 @@ public class StoreStatsService extends ServiceThread {
 
     private String getPutTps(int time) {
         String result = "";
-        this.lockSampling.lock();
+        this.samplingLock.lock();
         try {
             CallSnapshot last = this.putTimesList.getLast();
 
@@ -325,14 +325,14 @@ public class StoreStatsService extends ServiceThread {
             }
 
         } finally {
-            this.lockSampling.unlock();
+            this.samplingLock.unlock();
         }
         return result;
     }
 
     private String getGetFoundTps(int time) {
         String result = "";
-        this.lockSampling.lock();
+        this.samplingLock.lock();
         try {
             CallSnapshot last = this.getTimesFoundList.getLast();
 
@@ -342,7 +342,7 @@ public class StoreStatsService extends ServiceThread {
                 result += CallSnapshot.getTPS(lastBefore, last);
             }
         } finally {
-            this.lockSampling.unlock();
+            this.samplingLock.unlock();
         }
 
         return result;
@@ -350,7 +350,7 @@ public class StoreStatsService extends ServiceThread {
 
     private String getGetMissTps(int time) {
         String result = "";
-        this.lockSampling.lock();
+        this.samplingLock.lock();
         try {
             CallSnapshot last = this.getTimesMissList.getLast();
 
@@ -361,14 +361,14 @@ public class StoreStatsService extends ServiceThread {
             }
 
         } finally {
-            this.lockSampling.unlock();
+            this.samplingLock.unlock();
         }
 
         return result;
     }
 
     private String getGetTotalTps(int time) {
-        this.lockSampling.lock();
+        this.samplingLock.lock();
         double found = 0;
         double miss = 0;
         try {
@@ -392,7 +392,7 @@ public class StoreStatsService extends ServiceThread {
             }
 
         } finally {
-            this.lockSampling.unlock();
+            this.samplingLock.unlock();
         }
 
         return Double.toString(found + miss);
@@ -400,7 +400,7 @@ public class StoreStatsService extends ServiceThread {
 
     private String getGetTransferedTps(int time) {
         String result = "";
-        this.lockSampling.lock();
+        this.samplingLock.lock();
         try {
             CallSnapshot last = this.transferedMsgCountList.getLast();
 
@@ -411,7 +411,7 @@ public class StoreStatsService extends ServiceThread {
             }
 
         } finally {
-            this.lockSampling.unlock();
+            this.samplingLock.unlock();
         }
 
         return result;
@@ -469,7 +469,7 @@ public class StoreStatsService extends ServiceThread {
     }
 
     private void sampling() {
-        this.lockSampling.lock();
+        this.samplingLock.lock();
         try {
             this.putTimesList.add(new CallSnapshot(System.currentTimeMillis(), getPutMessageTimesTotal()));
             if (this.putTimesList.size() > (MAX_RECORDS_OF_SAMPLING + 1)) {
@@ -495,7 +495,7 @@ public class StoreStatsService extends ServiceThread {
             }
 
         } finally {
-            this.lockSampling.unlock();
+            this.samplingLock.unlock();
         }
     }