You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/03/06 11:58:55 UTC

[incubator-uniffle] branch master updated: [MINOR] Use multithreading to detect multiple disks (#687)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ffa50b97 [MINOR] Use multithreading to detect multiple disks (#687)
ffa50b97 is described below

commit ffa50b979ea6ae11b423646cc0c2381af29e4cf8
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Mon Mar 6 19:58:48 2023 +0800

    [MINOR] Use multithreading to detect multiple disks (#687)
    
    ### What changes were proposed in this pull request?
    When disk detection is enabled, we can use multiple threads to read and write.
    
    ### Why are the changes needed?
    Although the detection time is relatively short, the overall acceleration is about 30%.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Origin uts.
---
 .../apache/uniffle/server/LocalStorageChecker.java | 43 +++++++++++++---------
 1 file changed, 26 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 368db950..8c2f5b62 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -22,6 +22,9 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -69,31 +72,37 @@ public class LocalStorageChecker extends Checker {
 
   @Override
   public boolean checkIsHealthy() {
-    int num = 0;
-    long totalSpace = 0L;
-    long usedSpace = 0L;
-    int corruptedDirs = 0;
-
-    for (StorageInfo storageInfo : storageInfos) {
+    AtomicInteger num = new AtomicInteger(0);
+    AtomicLong totalSpace = new AtomicLong(0L);
+    AtomicLong usedSpace = new AtomicLong(0L);
+    AtomicInteger corruptedDirs = new AtomicInteger(0);
+    CountDownLatch cdl = new CountDownLatch(storageInfos.size());
+    storageInfos.parallelStream().forEach(storageInfo -> {
       if (!storageInfo.checkStorageReadAndWrite()) {
         storageInfo.markCorrupted();
-        corruptedDirs++;
-        continue;
+        corruptedDirs.incrementAndGet();
+        cdl.countDown();
+        return;
       }
 
-      totalSpace += getTotalSpace(storageInfo.storageDir);
-      usedSpace += getUsedSpace(storageInfo.storageDir);
+      totalSpace.addAndGet(getTotalSpace(storageInfo.storageDir));
+      usedSpace.addAndGet(getUsedSpace(storageInfo.storageDir));
 
       if (storageInfo.checkIsSpaceEnough()) {
-        num++;
+        num.incrementAndGet();
       }
+      cdl.countDown();
+    });
+    try {
+      cdl.await();
+    } catch (InterruptedException e) {
+      LOG.error("Failed to check local storage!");
     }
-
-    ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace);
-    ShuffleServerMetrics.gaugeLocalStorageUsedSpace.set(usedSpace);
+    ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace.get());
+    ShuffleServerMetrics.gaugeLocalStorageUsedSpace.set(usedSpace.get());
     ShuffleServerMetrics.gaugeLocalStorageTotalDirsNum.set(storageInfos.size());
-    ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs);
-    ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(usedSpace * 1.0 / totalSpace);
+    ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs.get());
+    ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(usedSpace.get() * 1.0 / totalSpace.get());
 
     if (storageInfos.isEmpty()) {
       if (isHealthy) {
@@ -103,7 +112,7 @@ public class LocalStorageChecker extends Checker {
       return false;
     }
 
-    double availablePercentage = num * 100.0 / storageInfos.size();
+    double availablePercentage = num.get() * 100.0 / storageInfos.size();
     if (Double.compare(availablePercentage, minStorageHealthyPercentage) >= 0) {
       if (!isHealthy) {
         LOG.info("shuffle server become healthy");