You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/07 10:55:51 UTC
[incubator-celeborn] branch main updated: [CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to avoid repeated definitions (#1313)
This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 84795bc63 [CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to avoid repeated definitions (#1313)
84795bc63 is described below
commit 84795bc63bce06a2d5c10269cf3e3d57086538f3
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Tue Mar 7 18:55:46 2023 +0800
[CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to avoid repeated definitions (#1313)
---
.../service/deploy/worker/PushDataHandler.scala | 20 +++-----------------
1 file changed, 3 insertions(+), 17 deletions(-)
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 273cce6e9..56d21beb9 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -224,23 +224,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
callbackWithTimer.onFailure(new CelebornIOException(cause))
return
}
- val diskFull =
- if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
- workerInfo.diskInfos
- .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
- .actualUsableSpace < diskReserveSize
- } else {
- false
- }
- if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
- (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) {
- if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
- softSplit.set(true)
- } else {
- callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
- return
- }
- }
+
+ if (checkDiskFullAndSplit(fileWriter, isMaster, softSplit, callbackWithTimer)) return
+
fileWriter.incrementPendingWrites()
// for master, send data to slave