You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by zh...@apache.org on 2023/03/07 10:55:51 UTC

[incubator-celeborn] branch main updated: [CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to avoid repeated definitions (#1313)

This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 84795bc63 [CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to avoid repeated definitions (#1313)
84795bc63 is described below

commit 84795bc63bce06a2d5c10269cf3e3d57086538f3
Author: jiaoqingbo <11...@qq.com>
AuthorDate: Tue Mar 7 18:55:46 2023 +0800

    [CELEBORN-382] Call checkDiskFullAndSplit in the handlePushData method to avoid repeated definitions (#1313)
---
 .../service/deploy/worker/PushDataHandler.scala      | 20 +++-----------------
 1 file changed, 3 insertions(+), 17 deletions(-)

diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 273cce6e9..56d21beb9 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -224,23 +224,9 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       callbackWithTimer.onFailure(new CelebornIOException(cause))
       return
     }
-    val diskFull =
-      if (fileWriter.flusher.isInstanceOf[LocalFlusher]) {
-        workerInfo.diskInfos
-          .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
-          .actualUsableSpace < diskReserveSize
-      } else {
-        false
-      }
-    if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
-      (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) {
-      if (fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
-        softSplit.set(true)
-      } else {
-        callbackWithTimer.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
-        return
-      }
-    }
+
+    if (checkDiskFullAndSplit(fileWriter, isMaster, softSplit, callbackWithTimer)) return
+
     fileWriter.incrementPendingWrites()
 
     // for master, send data to slave