You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/03/06 11:58:55 UTC
[incubator-uniffle] branch master updated: [MINOR] Use multithreading to detect multiple disks (#687)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ffa50b97 [MINOR] Use multithreading to detect multiple disks (#687)
ffa50b97 is described below
commit ffa50b979ea6ae11b423646cc0c2381af29e4cf8
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Mon Mar 6 19:58:48 2023 +0800
[MINOR] Use multithreading to detect multiple disks (#687)
### What changes were proposed in this pull request?
When disk detection is enabled, we can use multiple threads to read and write.
### Why are the changes needed?
Although the detection time is relatively short, the overall acceleration is about 30%.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Origin uts.
---
.../apache/uniffle/server/LocalStorageChecker.java | 43 +++++++++++++---------
1 file changed, 26 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 368db950..8c2f5b62 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -22,6 +22,9 @@ import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -69,31 +72,37 @@ public class LocalStorageChecker extends Checker {
@Override
public boolean checkIsHealthy() {
- int num = 0;
- long totalSpace = 0L;
- long usedSpace = 0L;
- int corruptedDirs = 0;
-
- for (StorageInfo storageInfo : storageInfos) {
+ AtomicInteger num = new AtomicInteger(0);
+ AtomicLong totalSpace = new AtomicLong(0L);
+ AtomicLong usedSpace = new AtomicLong(0L);
+ AtomicInteger corruptedDirs = new AtomicInteger(0);
+ CountDownLatch cdl = new CountDownLatch(storageInfos.size());
+ storageInfos.parallelStream().forEach(storageInfo -> {
if (!storageInfo.checkStorageReadAndWrite()) {
storageInfo.markCorrupted();
- corruptedDirs++;
- continue;
+ corruptedDirs.incrementAndGet();
+ cdl.countDown();
+ return;
}
- totalSpace += getTotalSpace(storageInfo.storageDir);
- usedSpace += getUsedSpace(storageInfo.storageDir);
+ totalSpace.addAndGet(getTotalSpace(storageInfo.storageDir));
+ usedSpace.addAndGet(getUsedSpace(storageInfo.storageDir));
if (storageInfo.checkIsSpaceEnough()) {
- num++;
+ num.incrementAndGet();
}
+ cdl.countDown();
+ });
+ try {
+ cdl.await();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to check local storage!");
}
-
- ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace);
- ShuffleServerMetrics.gaugeLocalStorageUsedSpace.set(usedSpace);
+ ShuffleServerMetrics.gaugeLocalStorageTotalSpace.set(totalSpace.get());
+ ShuffleServerMetrics.gaugeLocalStorageUsedSpace.set(usedSpace.get());
ShuffleServerMetrics.gaugeLocalStorageTotalDirsNum.set(storageInfos.size());
- ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs);
- ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(usedSpace * 1.0 / totalSpace);
+ ShuffleServerMetrics.gaugeLocalStorageCorruptedDirsNum.set(corruptedDirs.get());
+ ShuffleServerMetrics.gaugeLocalStorageUsedSpaceRatio.set(usedSpace.get() * 1.0 / totalSpace.get());
if (storageInfos.isEmpty()) {
if (isHealthy) {
@@ -103,7 +112,7 @@ public class LocalStorageChecker extends Checker {
return false;
}
- double availablePercentage = num * 100.0 / storageInfos.size();
+ double availablePercentage = num.get() * 100.0 / storageInfos.size();
if (Double.compare(availablePercentage, minStorageHealthyPercentage) >= 0) {
if (!isHealthy) {
LOG.info("shuffle server become healthy");