You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2024/01/24 02:46:04 UTC

(incubator-celeborn) branch main updated: [CELEBORN-1246] Introduce OpenStreamSuccessCount, FetchChunkSuccessCount and WriteDataSuccessCount metric to expose the count of opening stream, fetching chunk and writing data successfully

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 4709251bb [CELEBORN-1246] Introduce OpenStreamSuccessCount, FetchChunkSuccessCount and WriteDataSuccessCount metric to expose the count of opening stream, fetching chunk and writing data successfully
4709251bb is described below

commit 4709251bb4d2e61d166ea82269d6bd6d591890a2
Author: Angerszhuuuu <an...@gmail.com>
AuthorDate: Wed Jan 24 10:44:28 2024 +0800

    [CELEBORN-1246] Introduce OpenStreamSuccessCount, FetchChunkSuccessCount and WriteDataSuccessCount metric to expose the count of opening stream, fetching chunk and writing data successfully
    
    ### What changes were proposed in this pull request?
    
    Introduce `OpenStreamSuccessCount`, `FetchChunkSuccessCount` and `WriteDataSuccessCount` metric to expose the count of opening stream, fetching chunk and writing data successfully in current worker.
    
    ### Why are the changes needed?
    
    The ratio of opening stream, fetching chunk and writing data failed is important for Celeborn performance to balance the healty of cluster, which is lack of the count of opening stream, fetching chunk and writing data successfully.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No.
    
    Closes #2252 from AngersZhuuuu/CELEBORN-1246.
    
    Authored-by: Angerszhuuuu <an...@gmail.com>
    Signed-off-by: mingji <fe...@alibaba-inc.com>
---
 docs/monitoring.md                                                  | 3 +++
 .../org/apache/celeborn/service/deploy/worker/FetchHandler.scala    | 2 ++
 .../org/apache/celeborn/service/deploy/worker/PushDataHandler.scala | 2 ++
 .../org/apache/celeborn/service/deploy/worker/WorkerSource.scala    | 6 ++++++
 4 files changed, 13 insertions(+)

diff --git a/docs/monitoring.md b/docs/monitoring.md
index ec9ac8e11..96d744ff9 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -143,13 +143,16 @@ These metrics are exposed by Celeborn worker.
         - The time for a worker to fetch a chunk which is 8MB by default from a reduced partition. 
     - ActiveChunkStreamCount
         - Active stream count for reduce partition reading streams.
+    - OpenStreamSuccessCount
     - OpenStreamFailCount
+    - FetchChunkSuccessCount
     - FetchChunkFailCount
     - PrimaryPushDataTime
         - The time for a worker to handle a pushData RPC sent from a celeborn client.
     - ReplicaPushDataTime
         - The time for a worker to handle a pushData RPC sent from a celeborn worker by replicating.
     - WriteDataHardSplitCount
+    - WriteDataSuccessCount
     - WriteDataFailCount
     - ReplicateDataFailCount
     - ReplicateDataWriteFailCount
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
index edc0bea60..006a1400d 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala
@@ -286,6 +286,7 @@ class FetchHandler(
             endIndex,
             fileInfo)
       }
+      workerSource.incCounter(WorkerSource.OPEN_STREAM_SUCCESS_COUNT)
     } catch {
       case e: IOException =>
         workerSource.incCounter(WorkerSource.OPEN_STREAM_FAIL_COUNT)
@@ -414,6 +415,7 @@ class FetchHandler(
                 s"Sending ChunkFetchSuccess operation failed, chunk $streamChunkSlice",
                 future.cause())
             }
+            workerSource.incCounter(WorkerSource.FETCH_CHUNK_SUCCESS_COUNT)
             chunkStreamManager.chunkSent(streamChunkSlice.streamId)
             if (fetchTimeMetric != null) {
               fetchTimeMetric.update(System.nanoTime() - fetchBeginTime)
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 37243671a..a0b0d6b95 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1254,6 +1254,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
             } else {
               StatusCode.PUSH_DATA_WRITE_FAIL_REPLICA
             }
+          workerSource.incCounter(WorkerSource.WRITE_DATA_FAIL_COUNT)
           writePromise.failure(new CelebornIOException(cause))
           fileWriter.decrementPendingWrites()
       }
@@ -1283,6 +1284,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
         writeData(fileWriters.head, body, shuffleKey)
     }
     if (!writePromise.isCompleted) {
+      workerSource.incCounter(WorkerSource.WRITE_DATA_SUCCESS_COUNT)
       writePromise.success()
     }
   }
diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
index c85b6c3d9..eb037afc7 100644
--- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
+++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala
@@ -26,9 +26,12 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
 
   import WorkerSource._
   // add counters
+  addCounter(OPEN_STREAM_SUCCESS_COUNT)
   addCounter(OPEN_STREAM_FAIL_COUNT)
+  addCounter(FETCH_CHUNK_SUCCESS_COUNT)
   addCounter(FETCH_CHUNK_FAIL_COUNT)
   addCounter(WRITE_DATA_HARD_SPLIT_COUNT)
+  addCounter(WRITE_DATA_SUCCESS_COUNT)
   addCounter(WRITE_DATA_FAIL_COUNT)
   addCounter(REPLICATE_DATA_FAIL_COUNT)
   addCounter(REPLICATE_DATA_WRITE_FAIL_COUNT)
@@ -79,13 +82,16 @@ object WorkerSource {
   val OPEN_STREAM_TIME = "OpenStreamTime"
   val FETCH_CHUNK_TIME = "FetchChunkTime"
   val ACTIVE_CHUNK_STREAM_COUNT = "ActiveChunkStreamCount"
+  val OPEN_STREAM_SUCCESS_COUNT = "OpenStreamSuccessCount"
   val OPEN_STREAM_FAIL_COUNT = "OpenStreamFailCount"
+  val FETCH_CHUNK_SUCCESS_COUNT = "FetchChunkSuccessCount"
   val FETCH_CHUNK_FAIL_COUNT = "FetchChunkFailCount"
 
   // push data
   val PRIMARY_PUSH_DATA_TIME = "PrimaryPushDataTime"
   val REPLICA_PUSH_DATA_TIME = "ReplicaPushDataTime"
   val WRITE_DATA_HARD_SPLIT_COUNT = "WriteDataHardSplitCount"
+  val WRITE_DATA_SUCCESS_COUNT = "WriteDataSuccessCount"
   val WRITE_DATA_FAIL_COUNT = "WriteDataFailCount"
   val REPLICATE_DATA_FAIL_COUNT = "ReplicateDataFailCount"
   val REPLICATE_DATA_WRITE_FAIL_COUNT = "ReplicateDataWriteFailCount"