You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:22 UTC

[incubator-celeborn] 07/42: [CELEBORN-400] Add RPC metrics for OpenStream (#1326)

This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 9c5820ab0cc2f27b1022235c7c4c12ab64ea6412
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Mar 10 21:22:05 2023 +0800

    [CELEBORN-400] Add RPC metrics for OpenStream (#1326)
---
 .../org/apache/celeborn/common/metrics/source/RPCSource.scala      | 6 +++++-
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala   | 7 ++++---
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala
index 0f0fb8a6f..946d6b7de 100644
--- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.common.metrics.source
 
 import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.network.protocol.{ChunkFetchRequest, PushData, PushMergedData}
+import org.apache.celeborn.common.network.protocol.{ChunkFetchRequest, OpenStream, PushData, PushMergedData}
 import org.apache.celeborn.common.protocol.{PbRegisterWorker, PbUnregisterShuffle}
 import org.apache.celeborn.common.protocol.message.ControlMessages._
 
@@ -38,6 +38,7 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
   addCounter(RPCPushDataSize)
   addCounter(RPCPushMergedDataNum)
   addCounter(RPCPushMergedDataSize)
+  addCounter(RPCOpenStreamNum)
   addCounter(RPCChunkFetchRequestNum)
 
   // Master RPC
@@ -72,6 +73,8 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
         incCounter(RPCPushMergedDataSize, messageLen)
       case _: ChunkFetchRequest =>
         incCounter(RPCChunkFetchRequestNum)
+      case _: OpenStream =>
+        incCounter(RPCOpenStreamNum)
       case _: HeartbeatFromApplication =>
         incCounter(RPCHeartbeatFromApplicationNum)
       case _: HeartbeatFromWorker =>
@@ -109,6 +112,7 @@ object RPCSource {
   val RPCPushDataSize = "RPCPushDataSize"
   val RPCPushMergedDataNum = "RPCPushMergedDataNum"
   val RPCPushMergedDataSize = "RPCPushMergedDataSize"
+  val RPCOpenStreamNum = "RPCOpenStreamNum"
   val RPCChunkFetchRequestNum = "RPCChunkFetchRequestNum"
 
   // Master RPC
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 787ee7d96..864be6f44 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -84,15 +84,16 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
         rpcSource.updateMessageMetrics(r, 0)
         handleChunkFetchRequest(client, r)
       case r: RpcRequest =>
-        handleOpenStream(client, r)
+        val msg = Message.decode(r.body().nioByteBuffer())
+        rpcSource.updateMessageMetrics(msg, 0)
+        handleOpenStream(client, r, msg)
       case unknown: RequestMessage =>
         throw new IllegalArgumentException(s"Unknown message type id: ${unknown.`type`.id}")
     }
   }
 
   // here are BackLogAnnouncement,OpenStream and OpenStreamWithCredit RPCs to handle
-  def handleOpenStream(client: TransportClient, request: RpcRequest): Unit = {
-    val msg = Message.decode(request.body().nioByteBuffer())
+  def handleOpenStream(client: TransportClient, request: RpcRequest, msg: Message): Unit = {
     val (shuffleKey, fileName) =
       if (msg.`type`() == Type.OPEN_STREAM) {
         val openStream = msg.asInstanceOf[OpenStream]