You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by re...@apache.org on 2023/03/29 02:23:22 UTC
[incubator-celeborn] 07/42: [CELEBORN-400] Add RPC metrics for OpenStream (#1326)
This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
commit 9c5820ab0cc2f27b1022235c7c4c12ab64ea6412
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Fri Mar 10 21:22:05 2023 +0800
[CELEBORN-400] Add RPC metrics for OpenStream (#1326)
---
.../org/apache/celeborn/common/metrics/source/RPCSource.scala | 6 +++++-
.../org/apache/celeborn/service/deploy/worker/FetchHandler.scala | 7 ++++---
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git a/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala b/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala
index 0f0fb8a6f..946d6b7de 100644
--- a/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/metrics/source/RPCSource.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.common.metrics.source
import org.apache.celeborn.common.CelebornConf
-import org.apache.celeborn.common.network.protocol.{ChunkFetchRequest, PushData, PushMergedData}
+import org.apache.celeborn.common.network.protocol.{ChunkFetchRequest, OpenStream, PushData, PushMergedData}
import org.apache.celeborn.common.protocol.{PbRegisterWorker, PbUnregisterShuffle}
import org.apache.celeborn.common.protocol.message.ControlMessages._
@@ -38,6 +38,7 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
addCounter(RPCPushDataSize)
addCounter(RPCPushMergedDataNum)
addCounter(RPCPushMergedDataSize)
+ addCounter(RPCOpenStreamNum)
addCounter(RPCChunkFetchRequestNum)
// Master RPC
@@ -72,6 +73,8 @@ class RPCSource(conf: CelebornConf, role: String) extends AbstractSource(conf, r
incCounter(RPCPushMergedDataSize, messageLen)
case _: ChunkFetchRequest =>
incCounter(RPCChunkFetchRequestNum)
+ case _: OpenStream =>
+ incCounter(RPCOpenStreamNum)
case _: HeartbeatFromApplication =>
incCounter(RPCHeartbeatFromApplicationNum)
case _: HeartbeatFromWorker =>
@@ -109,6 +112,7 @@ object RPCSource {
val RPCPushDataSize = "RPCPushDataSize"
val RPCPushMergedDataNum = "RPCPushMergedDataNum"
val RPCPushMergedDataSize = "RPCPushMergedDataSize"
+ val RPCOpenStreamNum = "RPCOpenStreamNum"
val RPCChunkFetchRequestNum = "RPCChunkFetchRequestNum"
// Master RPC
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index 787ee7d96..864be6f44 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -84,15 +84,16 @@ class FetchHandler(val conf: TransportConf) extends BaseMessageHandler with Logg
rpcSource.updateMessageMetrics(r, 0)
handleChunkFetchRequest(client, r)
case r: RpcRequest =>
- handleOpenStream(client, r)
+ val msg = Message.decode(r.body().nioByteBuffer())
+ rpcSource.updateMessageMetrics(msg, 0)
+ handleOpenStream(client, r, msg)
case unknown: RequestMessage =>
throw new IllegalArgumentException(s"Unknown message type id: ${unknown.`type`.id}")
}
}
// here are BackLogAnnouncement,OpenStream and OpenStreamWithCredit RPCs to handle
- def handleOpenStream(client: TransportClient, request: RpcRequest): Unit = {
- val msg = Message.decode(request.body().nioByteBuffer())
+ def handleOpenStream(client: TransportClient, request: RpcRequest, msg: Message): Unit = {
val (shuffleKey, fileName) =
if (msg.`type`() == Type.OPEN_STREAM) {
val openStream = msg.asInstanceOf[OpenStream]