You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@celeborn.apache.org by GitBox <gi...@apache.org> on 2022/11/15 01:31:48 UTC

[GitHub] [incubator-celeborn] RexXiong commented on a diff in pull request #966: [REFACTOR] PushDataHandler code refactor

RexXiong commented on code in PR #966:
URL: https://github.com/apache/incubator-celeborn/pull/966#discussion_r1022226679


##########
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala:
##########
@@ -71,65 +71,36 @@ class PushDataHandler extends BaseMessageHandler with Logging {
   override def receive(client: TransportClient, msg: RequestMessage): Unit =
     msg match {
       case pushData: PushData =>
-        try {
-          rpcSource.updateMessageMetrics(pushData, pushData.body().size())
-          handlePushData(
-            pushData,
-            new RpcResponseCallback {
-              override def onSuccess(response: ByteBuffer): Unit = {
-                client.getChannel.writeAndFlush(new RpcResponse(
-                  pushData.requestId,
-                  new NioManagedBuffer(response)))
-              }
-
-              override def onFailure(e: Throwable): Unit = {
-                logError(
-                  "[processPushData] Process pushData onFailure! ShuffleKey: "
-                    + pushData.shuffleKey + ", partitionUniqueId: " + pushData.partitionUniqueId,
-                  e)
-                client.getChannel.writeAndFlush(new RpcFailure(pushData.requestId, e.getMessage))
-              }
-            })
-        } catch {
-          case e: Exception =>
-            logError(s"Error while handlePushData $pushData", e)
-            client.getChannel.writeAndFlush(new RpcFailure(
-              pushData.requestId,
-              Throwables.getStackTraceAsString(e)))
-        } finally {
-          pushData.body().release()
-        }
-      case pushMergedData: PushMergedData =>
-        try {
-          rpcSource.updateMessageMetrics(pushMergedData, pushMergedData.body().size())
-          handlePushMergedData(
-            pushMergedData,
-            new RpcResponseCallback {
-              override def onSuccess(response: ByteBuffer): Unit = {
-                client.getChannel.writeAndFlush(new RpcResponse(
-                  pushMergedData.requestId,
-                  new NioManagedBuffer(response)))
-              }
-
-              override def onFailure(e: Throwable): Unit = {
-                logError(
-                  "[processPushMergedData] Process PushMergedData onFailure! ShuffleKey: " +
-                    pushMergedData.shuffleKey +
-                    ", partitionUniqueId: " + pushMergedData.partitionUniqueIds.mkString(","),
-                  e)
-                client.getChannel.writeAndFlush(
-                  new RpcFailure(pushMergedData.requestId, e.getMessage()))
-              }
-            })
-        } catch {
-          case e: Exception =>
-            logError(s"Error while handlePushMergedData $pushMergedData", e);
-            client.getChannel.writeAndFlush(new RpcFailure(
-              pushMergedData.requestId,
-              Throwables.getStackTraceAsString(e)));
-        } finally {
-          pushMergedData.body().release()
-        }
+        handleCore(
+          client,
+          "PushData",

Review Comment:
   how about use RequestMessage.type instead of hardcode action as "PushData/PushMergeData"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org