You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/27 15:09:52 UTC

[GitHub] [incubator-celeborn] zhongqiangczq opened a new pull request, #1013: [CELEBORN-71] supports mappartition write

zhongqiangczq opened a new pull request, #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013

   # [BUG]/[FEATURE] title
   
   ### What changes were proposed in this pull request?
   
   
   ### Why are the changes needed?
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   ### What are the items that need reviewer attention?
   
   
   ### Related issues.
   
   
   ### Related pull requests.
   
   
   ### How was this patch tested?
   
   
   /cc @related-reviewer
   @waitinfuture @RexXiong @FMX 
   /assign @main-reviewer
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
zhongqiangczq commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1039097009


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,
+      location: PartitionLocation,
+      callback: RpcResponseCallback,
+      wrappedCallback: RpcResponseCallback): Boolean = {
+    if (location == null) {
+      val (mapId, attemptId) = getMapAttempt(body, shuffleKey, partitionUniqueId)
+      if (shuffleMapperAttempts.containsKey(shuffleKey) &&
+        -1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
+        // partition data has already been committed
+        logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
+          s" attempt $attemptId), but this mapper has already been ended.")
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+      } else {
+        val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
+          s"attempt $attemptId, uniqueId $partitionUniqueId)."
+        logWarning(s"[handle$messageType] $msg")
+        messageType match {
+          case Type.PUSH_MERGED_DATA => callback.onFailure(new Exception(msg))
+          case _ => callback.onFailure(
+              new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  private def checkFileWriterException(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      fileWriter: FileWriter,
+      callback: RpcResponseCallback): Unit = {
+    logWarning(
+      s"[handle$messageType] fileWriter $fileWriter has Exception ${fileWriter.getException}")
+
+    val (messageMaster, messageSlave) =
+      messageType match {
+        case Type.PUSH_DATA | Type.PUSH_DATA_HAND_SHAKE =>
+          (
+            StatusCode.PUSH_DATA_FAIL_MASTER.getMessage(),
+            StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())
+        case Type.PUSH_DATA_HAND_SHAKE => (
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_MASTER.getMessage,
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage)
+        case Type.REGION_START => (
+            StatusCode.REGION_START_FAIL_MASTER.getMessage,
+            StatusCode.REGION_START_FAIL_SLAVE.getMessage)
+        case Type.REGION_FINISH => (
+            StatusCode.REGION_FINISH_FAIL_MASTER.getMessage,
+            StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage)
+      }
+    callback.onFailure(new Exception(
+      if (isMaster) messageMaster else messageSlave,
+      fileWriter.getException))
+  }
+
+  private def getFileWriterAndCheck(
+      messageType: Message.Type,
+      location: PartitionLocation,
+      isMaster: Boolean,
+      callback: RpcResponseCallback): (Boolean, FileWriter) = {
+    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
+    val exception = fileWriter.getException
+    if (exception != null) {
+      checkFileWriterException(messageType, isMaster, fileWriter, callback)
+      return (true, fileWriter)
+    }
+    (false, fileWriter)
+  }
+
+  private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    val diskFull = workerInfo.diskInfos
+      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+      .actualUsableSpace < diskReserveSize
+    diskFull
+  }
+
+  private def checkDiskFullAndSplit(
+      fileWriter: FileWriter,
+      isMaster: Boolean,
+      softSplit: AtomicBoolean,
+      callback: RpcResponseCallback): Boolean = {
+    val diskFull = checkDiskFull(fileWriter)
+    if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
+      (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) {
+      if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
+        softSplit.set(true)
+      } else {
+        callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+        return true
+      }
+    }
+    false
+  }
+
+  private def getMapAttempt(
+      body: ByteBuf,

Review Comment:
   yes , refatcor will do after this pr.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
FMX commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1039117243


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,277 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {

Review Comment:
   Maybe this will be more clear?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,277 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(

Review Comment:
   Maybe "if(checkLocationNull(xxx) {return}" will be more clear?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,277 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {

Review Comment:
   ```suggestion
       if (isCheckSplit && checkDiskFullAndSplit(fileWriter, isMaster, null, callback)) {
       return;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
FMX commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1037996782


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")

Review Comment:
   The stop timer method always needs to be called. It can moved out of if block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
zhongqiangczq commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1039091651


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")

Review Comment:
   yes, checkLocationNull which depends wrappedcallback to send response will process location=null 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
FMX commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1037958180


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")

Review Comment:
   If the location is null means this RPC should not be processed, why not exit this method here?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,
+      location: PartitionLocation,
+      callback: RpcResponseCallback,
+      wrappedCallback: RpcResponseCallback): Boolean = {
+    if (location == null) {
+      val (mapId, attemptId) = getMapAttempt(body, shuffleKey, partitionUniqueId)
+      if (shuffleMapperAttempts.containsKey(shuffleKey) &&
+        -1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
+        // partition data has already been committed
+        logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
+          s" attempt $attemptId), but this mapper has already been ended.")
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+      } else {
+        val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
+          s"attempt $attemptId, uniqueId $partitionUniqueId)."
+        logWarning(s"[handle$messageType] $msg")
+        messageType match {
+          case Type.PUSH_MERGED_DATA => callback.onFailure(new Exception(msg))
+          case _ => callback.onFailure(
+              new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  private def checkFileWriterException(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      fileWriter: FileWriter,
+      callback: RpcResponseCallback): Unit = {
+    logWarning(
+      s"[handle$messageType] fileWriter $fileWriter has Exception ${fileWriter.getException}")
+
+    val (messageMaster, messageSlave) =
+      messageType match {
+        case Type.PUSH_DATA | Type.PUSH_DATA_HAND_SHAKE =>
+          (
+            StatusCode.PUSH_DATA_FAIL_MASTER.getMessage(),
+            StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())
+        case Type.PUSH_DATA_HAND_SHAKE => (
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_MASTER.getMessage,
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage)
+        case Type.REGION_START => (
+            StatusCode.REGION_START_FAIL_MASTER.getMessage,
+            StatusCode.REGION_START_FAIL_SLAVE.getMessage)
+        case Type.REGION_FINISH => (
+            StatusCode.REGION_FINISH_FAIL_MASTER.getMessage,
+            StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage)
+      }
+    callback.onFailure(new Exception(
+      if (isMaster) messageMaster else messageSlave,
+      fileWriter.getException))
+  }
+
+  private def getFileWriterAndCheck(
+      messageType: Message.Type,
+      location: PartitionLocation,
+      isMaster: Boolean,
+      callback: RpcResponseCallback): (Boolean, FileWriter) = {
+    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
+    val exception = fileWriter.getException
+    if (exception != null) {
+      checkFileWriterException(messageType, isMaster, fileWriter, callback)
+      return (true, fileWriter)
+    }
+    (false, fileWriter)
+  }
+
+  private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    val diskFull = workerInfo.diskInfos
+      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+      .actualUsableSpace < diskReserveSize

Review Comment:
   Here is a logic error. ActualUsableSpace > means this disk is writable.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,
+      location: PartitionLocation,
+      callback: RpcResponseCallback,
+      wrappedCallback: RpcResponseCallback): Boolean = {
+    if (location == null) {
+      val (mapId, attemptId) = getMapAttempt(body, shuffleKey, partitionUniqueId)
+      if (shuffleMapperAttempts.containsKey(shuffleKey) &&
+        -1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
+        // partition data has already been committed
+        logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
+          s" attempt $attemptId), but this mapper has already been ended.")
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+      } else {
+        val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
+          s"attempt $attemptId, uniqueId $partitionUniqueId)."
+        logWarning(s"[handle$messageType] $msg")
+        messageType match {
+          case Type.PUSH_MERGED_DATA => callback.onFailure(new Exception(msg))
+          case _ => callback.onFailure(
+              new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  private def checkFileWriterException(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      fileWriter: FileWriter,
+      callback: RpcResponseCallback): Unit = {
+    logWarning(
+      s"[handle$messageType] fileWriter $fileWriter has Exception ${fileWriter.getException}")
+
+    val (messageMaster, messageSlave) =
+      messageType match {
+        case Type.PUSH_DATA | Type.PUSH_DATA_HAND_SHAKE =>
+          (
+            StatusCode.PUSH_DATA_FAIL_MASTER.getMessage(),
+            StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())
+        case Type.PUSH_DATA_HAND_SHAKE => (
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_MASTER.getMessage,
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage)
+        case Type.REGION_START => (
+            StatusCode.REGION_START_FAIL_MASTER.getMessage,
+            StatusCode.REGION_START_FAIL_SLAVE.getMessage)
+        case Type.REGION_FINISH => (
+            StatusCode.REGION_FINISH_FAIL_MASTER.getMessage,
+            StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage)
+      }
+    callback.onFailure(new Exception(
+      if (isMaster) messageMaster else messageSlave,
+      fileWriter.getException))
+  }
+
+  private def getFileWriterAndCheck(
+      messageType: Message.Type,
+      location: PartitionLocation,
+      isMaster: Boolean,
+      callback: RpcResponseCallback): (Boolean, FileWriter) = {
+    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
+    val exception = fileWriter.getException
+    if (exception != null) {
+      checkFileWriterException(messageType, isMaster, fileWriter, callback)
+      return (true, fileWriter)
+    }
+    (false, fileWriter)
+  }
+
+  private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    val diskFull = workerInfo.diskInfos
+      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+      .actualUsableSpace < diskReserveSize
+    diskFull
+  }
+
+  private def checkDiskFullAndSplit(
+      fileWriter: FileWriter,
+      isMaster: Boolean,
+      softSplit: AtomicBoolean,
+      callback: RpcResponseCallback): Boolean = {
+    val diskFull = checkDiskFull(fileWriter)
+    if ((diskFull && fileWriter.getFileInfo.getFileLength > partitionSplitMinimumSize) ||
+      (isMaster && fileWriter.getFileInfo.getFileLength > fileWriter.getSplitThreshold())) {
+      if (softSplit != null && fileWriter.getSplitMode == PartitionSplitMode.SOFT) {
+        softSplit.set(true)
+      } else {
+        callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.HARD_SPLIT.getValue)))
+        return true
+      }
+    }
+    false
+  }
+
+  private def getMapAttempt(
+      body: ByteBuf,

Review Comment:
   The body variable is always null at present. Will there add a refactor pull request to merge map partition and reduce partition push logic?



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,

Review Comment:
   Looks like the body is always null, maybe this variable can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX merged pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
FMX merged PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
zhongqiangczq commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1039092531


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,

Review Comment:
   yes, for message handshake/regionstart/regionfinish,  the body is always null, but for message pushdata,  it will be not null. in the future , i will refactor pushdata's logic to use thie method



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] zhongqiangczq commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
zhongqiangczq commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1039108195


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")

Review Comment:
   it has been removed outside



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
FMX commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1037974100


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,
+      location: PartitionLocation,
+      callback: RpcResponseCallback,
+      wrappedCallback: RpcResponseCallback): Boolean = {
+    if (location == null) {
+      val (mapId, attemptId) = getMapAttempt(body, shuffleKey, partitionUniqueId)
+      if (shuffleMapperAttempts.containsKey(shuffleKey) &&
+        -1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
+        // partition data has already been committed
+        logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
+          s" attempt $attemptId), but this mapper has already been ended.")
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+      } else {
+        val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
+          s"attempt $attemptId, uniqueId $partitionUniqueId)."
+        logWarning(s"[handle$messageType] $msg")
+        messageType match {
+          case Type.PUSH_MERGED_DATA => callback.onFailure(new Exception(msg))
+          case _ => callback.onFailure(
+              new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  private def checkFileWriterException(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      fileWriter: FileWriter,
+      callback: RpcResponseCallback): Unit = {
+    logWarning(
+      s"[handle$messageType] fileWriter $fileWriter has Exception ${fileWriter.getException}")
+
+    val (messageMaster, messageSlave) =
+      messageType match {
+        case Type.PUSH_DATA | Type.PUSH_DATA_HAND_SHAKE =>
+          (
+            StatusCode.PUSH_DATA_FAIL_MASTER.getMessage(),
+            StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())
+        case Type.PUSH_DATA_HAND_SHAKE => (
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_MASTER.getMessage,
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage)
+        case Type.REGION_START => (
+            StatusCode.REGION_START_FAIL_MASTER.getMessage,
+            StatusCode.REGION_START_FAIL_SLAVE.getMessage)
+        case Type.REGION_FINISH => (
+            StatusCode.REGION_FINISH_FAIL_MASTER.getMessage,
+            StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage)
+      }
+    callback.onFailure(new Exception(
+      if (isMaster) messageMaster else messageSlave,
+      fileWriter.getException))
+  }
+
+  private def getFileWriterAndCheck(
+      messageType: Message.Type,
+      location: PartitionLocation,
+      isMaster: Boolean,
+      callback: RpcResponseCallback): (Boolean, FileWriter) = {
+    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
+    val exception = fileWriter.getException
+    if (exception != null) {
+      checkFileWriterException(messageType, isMaster, fileWriter, callback)
+      return (true, fileWriter)
+    }
+    (false, fileWriter)
+  }
+
+  private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    val diskFull = workerInfo.diskInfos
+      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+      .actualUsableSpace < diskReserveSize

Review Comment:
   Here is a logic error. ActualUsableSpace >  0 means this disk is writable.



##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,
+      location: PartitionLocation,
+      callback: RpcResponseCallback,
+      wrappedCallback: RpcResponseCallback): Boolean = {
+    if (location == null) {
+      val (mapId, attemptId) = getMapAttempt(body, shuffleKey, partitionUniqueId)
+      if (shuffleMapperAttempts.containsKey(shuffleKey) &&
+        -1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
+        // partition data has already been committed
+        logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
+          s" attempt $attemptId), but this mapper has already been ended.")
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+      } else {
+        val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
+          s"attempt $attemptId, uniqueId $partitionUniqueId)."
+        logWarning(s"[handle$messageType] $msg")
+        messageType match {
+          case Type.PUSH_MERGED_DATA => callback.onFailure(new Exception(msg))
+          case _ => callback.onFailure(
+              new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  private def checkFileWriterException(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      fileWriter: FileWriter,
+      callback: RpcResponseCallback): Unit = {
+    logWarning(
+      s"[handle$messageType] fileWriter $fileWriter has Exception ${fileWriter.getException}")
+
+    val (messageMaster, messageSlave) =
+      messageType match {
+        case Type.PUSH_DATA | Type.PUSH_DATA_HAND_SHAKE =>
+          (
+            StatusCode.PUSH_DATA_FAIL_MASTER.getMessage(),
+            StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())
+        case Type.PUSH_DATA_HAND_SHAKE => (
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_MASTER.getMessage,
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage)
+        case Type.REGION_START => (
+            StatusCode.REGION_START_FAIL_MASTER.getMessage,
+            StatusCode.REGION_START_FAIL_SLAVE.getMessage)
+        case Type.REGION_FINISH => (
+            StatusCode.REGION_FINISH_FAIL_MASTER.getMessage,
+            StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage)
+      }
+    callback.onFailure(new Exception(
+      if (isMaster) messageMaster else messageSlave,
+      fileWriter.getException))
+  }
+
+  private def getFileWriterAndCheck(
+      messageType: Message.Type,
+      location: PartitionLocation,
+      isMaster: Boolean,
+      callback: RpcResponseCallback): (Boolean, FileWriter) = {
+    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
+    val exception = fileWriter.getException
+    if (exception != null) {
+      checkFileWriterException(messageType, isMaster, fileWriter, callback)
+      return (true, fileWriter)
+    }
+    (false, fileWriter)
+  }
+
+  private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    val diskFull = workerInfo.diskInfos
+      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+      .actualUsableSpace < diskReserveSize

Review Comment:
   Here is a logic error. ActualUsableSpace >  0 means the disk is writable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-celeborn] FMX commented on a diff in pull request #1013: [CELEBORN-71] pushdatahandler supports mappartition write: handshake/regionstart/regionfinish

Posted by GitBox <gi...@apache.org>.
FMX commented on code in PR #1013:
URL: https://github.com/apache/incubator-celeborn/pull/1013#discussion_r1037974100


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -526,4 +530,278 @@ class PushDataHandler extends BaseMessageHandler with Logging {
       message.body().release()
     }
   }
+
+  private def handleRpcRequest(client: TransportClient, rpcRequest: RpcRequest): Unit = {
+    val msg = Message.decode(rpcRequest.body().nioByteBuffer())
+    val requestId = rpcRequest.requestId
+    val (mode, shuffleKey, partitionUniqueId, isCheckSplit) = msg match {
+      case p: PushDataHandShake => (p.mode, p.shuffleKey, p.partitionUniqueId, false)
+      case rs: RegionStart => (rs.mode, rs.shuffleKey, rs.partitionUniqueId, true)
+      case rf: RegionFinish => (rf.mode, rf.shuffleKey, rf.partitionUniqueId, false)
+    }
+    handleCore(
+      client,
+      rpcRequest,
+      requestId,
+      () =>
+        handleRpcRequestCore(
+          mode,
+          msg,
+          shuffleKey,
+          partitionUniqueId,
+          requestId,
+          isCheckSplit,
+          new SimpleRpcResponseCallback(
+            msg.`type`(),
+            client,
+            requestId,
+            shuffleKey,
+            partitionUniqueId)))
+
+  }
+
+  private def handleRpcRequestCore(
+      mode: Byte,
+      message: Message,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      requestId: Long,
+      isCheckSplit: Boolean,
+      callback: RpcResponseCallback): Unit = {
+    val isMaster = mode == PartitionLocation.Mode.MASTER
+    val messageType = message.`type`()
+    log.info(s"requestId:$requestId, pushdata rpc:$messageType, mode:$mode, shuffleKey:$shuffleKey, partitionUniqueId:$partitionUniqueId")
+    val (workerSourceMaster, workerSourceSlave) =
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          (WorkerSource.MasterPushDataHandshakeTime, WorkerSource.SlavePushDataHandshakeTime)
+        case Type.REGION_START =>
+          (WorkerSource.MasterRegionStartTime, WorkerSource.SlaveRegionStartTime)
+        case Type.REGION_FINISH =>
+          (WorkerSource.MasterRegionFinishTime, WorkerSource.SlaveRegionFinishTime)
+      }
+
+    val location = isMaster match {
+      case true => partitionLocationInfo.getMasterLocation(shuffleKey, partitionUniqueId)
+      case false => partitionLocationInfo.getSlaveLocation(shuffleKey, partitionUniqueId)
+    }
+    workerSource.startTimer(if (isMaster) workerSourceMaster else workerSourceSlave, s"$requestId")
+    val wrappedCallback =
+      new WrappedRpcResponseCallback(
+        messageType,
+        isMaster,
+        requestId,
+        null,
+        location,
+        if (isMaster) workerSourceMaster else workerSourceSlave,
+        callback)
+
+    val isReturn = checkLocationNull(
+      messageType,
+      shuffleKey,
+      partitionUniqueId,
+      null,
+      location,
+      callback,
+      wrappedCallback)
+    if (isReturn) return
+
+    val (isReturnWriter, fileWriter) =
+      getFileWriterAndCheck(messageType, location, isMaster, callback)
+    if (isReturnWriter) return
+
+    if (isCheckSplit) {
+      val isReturnDisk = checkDiskFullAndSplit(fileWriter, isMaster, null, callback)
+      if (isReturnDisk) return
+    }
+
+    try {
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE => {
+          fileWriter.pushDataHandShake(message.asInstanceOf[PushDataHandShake].numPartitions)
+        }
+        case Type.REGION_START => {
+          fileWriter.regionStart(
+            message.asInstanceOf[RegionStart].currentRegionIndex,
+            message.asInstanceOf[RegionStart].isBroadcast)
+        }
+        case Type.REGION_FINISH => {
+          fileWriter.regionFinish()
+        }
+      }
+      // for master, send data to slave
+      if (location.getPeer != null && isMaster) {
+        // to do replica
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      } else {
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte]()))
+      }
+    } catch {
+      case t: Throwable =>
+        callback.onFailure(new Exception(s"$messageType failed", t))
+    }
+  }
+
+  class WrappedRpcResponseCallback(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      requestId: Long,
+      softSplit: AtomicBoolean,
+      location: PartitionLocation,
+      workerSourceTime: String,
+      callback: RpcResponseCallback)
+    extends RpcResponseCallback {
+    override def onSuccess(response: ByteBuffer): Unit = {
+      if (isMaster) {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        if (response.remaining() > 0) {
+          val resp = ByteBuffer.allocate(response.remaining())
+          resp.put(response)
+          resp.flip()
+          callback.onSuccess(resp)
+        } else if (softSplit != null && softSplit.get()) {
+          callback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.SOFT_SPLIT.getValue)))
+        } else {
+          callback.onSuccess(response)
+        }
+      } else {
+        workerSource.stopTimer(workerSourceTime, s"$requestId")
+        callback.onSuccess(response)
+      }
+    }
+
+    override def onFailure(e: Throwable): Unit = {
+      if (location != null) {
+        logError(s"[handle$messageType.onFailure] partitionLocation: $location")
+      }
+      messageType match {
+        case Type.PUSH_DATA_HAND_SHAKE =>
+          workerSource.incCounter(WorkerSource.PushDataHandshakeFailCount)
+          callback.onFailure(new Exception(
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage,
+            e))
+        case Type.REGION_START =>
+          workerSource.incCounter(WorkerSource.RegionStartFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_START_FAIL_SLAVE.getMessage, e))
+        case Type.REGION_FINISH =>
+          workerSource.incCounter(WorkerSource.RegionFinishFailCount)
+          callback.onFailure(new Exception(StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage, e))
+        case _ =>
+          workerSource.incCounter(WorkerSource.PushDataFailCount)
+          callback.onFailure(new Exception(StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage, e))
+      }
+    }
+  }
+
+  private def checkLocationNull(
+      messageType: Message.Type,
+      shuffleKey: String,
+      partitionUniqueId: String,
+      body: ByteBuf,
+      location: PartitionLocation,
+      callback: RpcResponseCallback,
+      wrappedCallback: RpcResponseCallback): Boolean = {
+    if (location == null) {
+      val (mapId, attemptId) = getMapAttempt(body, shuffleKey, partitionUniqueId)
+      if (shuffleMapperAttempts.containsKey(shuffleKey) &&
+        -1 != shuffleMapperAttempts.get(shuffleKey).get(mapId)) {
+        // partition data has already been committed
+        logInfo(s"Receive push data from speculative task(shuffle $shuffleKey, map $mapId, " +
+          s" attempt $attemptId), but this mapper has already been ended.")
+        wrappedCallback.onSuccess(ByteBuffer.wrap(Array[Byte](StatusCode.STAGE_ENDED.getValue)))
+      } else {
+        val msg = s"Partition location wasn't found for task(shuffle $shuffleKey, map $mapId, " +
+          s"attempt $attemptId, uniqueId $partitionUniqueId)."
+        logWarning(s"[handle$messageType] $msg")
+        messageType match {
+          case Type.PUSH_MERGED_DATA => callback.onFailure(new Exception(msg))
+          case _ => callback.onFailure(
+              new Exception(StatusCode.PUSH_DATA_FAIL_PARTITION_NOT_FOUND.getMessage()))
+        }
+      }
+      return true
+    }
+    false
+  }
+
+  private def checkFileWriterException(
+      messageType: Message.Type,
+      isMaster: Boolean,
+      fileWriter: FileWriter,
+      callback: RpcResponseCallback): Unit = {
+    logWarning(
+      s"[handle$messageType] fileWriter $fileWriter has Exception ${fileWriter.getException}")
+
+    val (messageMaster, messageSlave) =
+      messageType match {
+        case Type.PUSH_DATA | Type.PUSH_DATA_HAND_SHAKE =>
+          (
+            StatusCode.PUSH_DATA_FAIL_MASTER.getMessage(),
+            StatusCode.PUSH_DATA_FAIL_SLAVE.getMessage())
+        case Type.PUSH_DATA_HAND_SHAKE => (
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_MASTER.getMessage,
+            StatusCode.PUSH_DATA_HANDSHAKE_FAIL_SLAVE.getMessage)
+        case Type.REGION_START => (
+            StatusCode.REGION_START_FAIL_MASTER.getMessage,
+            StatusCode.REGION_START_FAIL_SLAVE.getMessage)
+        case Type.REGION_FINISH => (
+            StatusCode.REGION_FINISH_FAIL_MASTER.getMessage,
+            StatusCode.REGION_FINISH_FAIL_SLAVE.getMessage)
+      }
+    callback.onFailure(new Exception(
+      if (isMaster) messageMaster else messageSlave,
+      fileWriter.getException))
+  }
+
+  private def getFileWriterAndCheck(
+      messageType: Message.Type,
+      location: PartitionLocation,
+      isMaster: Boolean,
+      callback: RpcResponseCallback): (Boolean, FileWriter) = {
+    val fileWriter = location.asInstanceOf[WorkingPartition].getFileWriter
+    val exception = fileWriter.getException
+    if (exception != null) {
+      checkFileWriterException(messageType, isMaster, fileWriter, callback)
+      return (true, fileWriter)
+    }
+    (false, fileWriter)
+  }
+
+  private def checkDiskFull(fileWriter: FileWriter): Boolean = {
+    val diskFull = workerInfo.diskInfos
+      .get(fileWriter.flusher.asInstanceOf[LocalFlusher].mountPoint)
+      .actualUsableSpace < diskReserveSize

Review Comment:
   Here is a logic error. ActualUsableSpace >  0 means the disk is writable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org