You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@celeborn.apache.org by et...@apache.org on 2022/11/15 06:35:00 UTC

[incubator-celeborn] branch branch-950 created (now ae918174)

This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a change to branch branch-950
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


      at ae918174 [ISSUE-950]Add metrics for top disk usage apps.

This branch includes the following new commits:

     new ae918174 [ISSUE-950]Add metrics for top disk usage apps.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-celeborn] 01/01: [ISSUE-950]Add metrics for top disk usage apps.

Posted by et...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-950
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit ae918174b18bb4e8cd36276a15c5b5c2b9df8802
Author: Ethan Feng <et...@apache.org>
AuthorDate: Thu Nov 10 15:46:30 2022 +0800

    [ISSUE-950]Add metrics for top disk usage apps.
---
 common/src/main/proto/TransportMessages.proto      |   2 +-
 .../scala/com/aliyun/emr/rss/common/RssConf.scala  |  12 ++
 .../aliyun/emr/rss/common/meta/WorkerInfo.scala    |   5 +-
 .../common/protocol/message/ControlMessages.scala  |  15 ++-
 .../master/clustermeta/AbstractMetaManager.java    |  14 +-
 .../master/clustermeta/IMetadataHandler.java       |   3 +-
 .../clustermeta/SingleMasterMetaManager.java       |   6 +-
 .../master/clustermeta/ha/HAMasterMetaManager.java |   4 +-
 .../deploy/master/clustermeta/ha/MetaHandler.java  |   5 +-
 .../deploy/master/metrics/AppDiskUsageMetric.scala | 143 +++++++++++++++++++++
 server-master/src/main/proto/Resource.proto        |   1 +
 .../emr/rss/service/deploy/master/Master.scala     |  25 ++--
 .../deploy/master/http/HttpRequestHandler.scala    |   2 +
 .../clustermeta/DefaultMetaSystemSuiteJ.java       |   6 +-
 .../ha/RatisMasterStatusSystemSuiteJ.java          |   7 +-
 .../deploy/master/AppDiskUsageMetricSuite.scala    | 112 ++++++++++++++++
 .../deploy/worker/LocalStorageManager.scala        |   7 +-
 .../emr/rss/service/deploy/worker/Worker.scala     |  17 ++-
 18 files changed, 343 insertions(+), 43 deletions(-)

diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto
index 7eb346a3..7571b250 100644
--- a/common/src/main/proto/TransportMessages.proto
+++ b/common/src/main/proto/TransportMessages.proto
@@ -106,7 +106,7 @@ message PbHeartbeatFromWorker {
   int32 fetchPort = 4;
   int32 replicatePort = 5;
   int32 numSlots = 6;
-  repeated string shuffleKeys = 7;
+  map<string,int64> shuffleDiskUsage = 7;
   string requestId = 8;
 }
 
diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
index 91209338..c4c46c1f 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/RssConf.scala
@@ -835,6 +835,18 @@ object RssConf extends Logging {
     conf.getTimeAsMs("rss.rpc.cache.expire", "15s")
   }
 
+  def metricsAppTopDiskUsageCount(conf: RssConf): Int = {
+    conf.getInt("rss.metrics.app.topDiskUsage.count", 50)
+  }
+
+  def metricsAppTopDiskUsageWindowSize(conf: RssConf): Int = {
+    conf.getInt("rss.metrics.app.topDiskUsage.windowSize", 24)
+  }
+
+  def metricsAppTopDiskUsageInterval(conf: RssConf): Long = {
+    conf.getTimeAsSeconds("rss.metrids.app.topDiskUsage.interval", "1h")
+  }
+
   val WorkingDirName = "hadoop/rss-worker/shuffle_data"
 
   // If we want to use multi-raft group we can
diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala b/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala
index 0d6cc632..9cfef88f 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/meta/WorkerInfo.scala
@@ -137,11 +137,12 @@ class WorkerInfo(
     host == other.host &&
       rpcPort == other.rpcPort &&
       pushPort == other.pushPort &&
-      fetchPort == other.fetchPort
+      fetchPort == other.fetchPort &&
+      replicatePort == other.replicatePort
   }
 
   override def hashCode(): Int = {
-    Objects.hashCode(host, rpcPort, pushPort, fetchPort)
+    Objects.hashCode(host, rpcPort, pushPort, fetchPort, replicatePort)
   }
 }
 
diff --git a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala
index a65a62fe..8cf2f518 100644
--- a/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala
+++ b/common/src/main/scala/com/aliyun/emr/rss/common/protocol/message/ControlMessages.scala
@@ -58,7 +58,7 @@ sealed trait Message extends Serializable{
         new TransportMessage(TransportMessages.MessageType.REGISTER_WORKER, payload)
 
       case HeartbeatFromWorker(host, rpcPort, pushPort, fetchPort, replicatePort, numSlots,
-      shuffleKeys, requestId) =>
+      shuffleDiskUsage, requestId) =>
         val payload = TransportMessages.PbHeartbeatFromWorker.newBuilder()
           .setHost(host)
           .setRpcPort(rpcPort)
@@ -66,7 +66,7 @@ sealed trait Message extends Serializable{
           .setFetchPort(fetchPort)
           .setNumSlots(numSlots)
           .setReplicatePort(replicatePort)
-          .addAllShuffleKeys(shuffleKeys)
+          .putAllShuffleDiskUsage(shuffleDiskUsage)
           .setRequestId(requestId)
           .build().toByteArray
         new TransportMessage(TransportMessages.MessageType.HEARTBEAT_FROM_WORKER, payload)
@@ -467,8 +467,9 @@ object ControlMessages extends Logging{
       fetchPort: Int,
       replicatePort : Int,
       numSlots: Int,
-      shuffleKeys: util.HashSet[String],
-    override var requestId: String = ZERO_UUID) extends MasterRequestMessage
+      shuffleDiskUsage: util.HashMap[String, java.lang.Long],
+      override var requestId: String = ZERO_UUID)
+    extends MasterRequestMessage
 
   case class HeartbeatResponse(
       expiredShuffleKeys: util.HashSet[String],
@@ -676,9 +677,9 @@ object ControlMessages extends Logging{
 
       case HEARTBEAT_FROM_WORKER =>
         val pbHeartbeatFromWorker = PbHeartbeatFromWorker.parseFrom(message.getPayload)
-        val shuffleKeys = new util.HashSet[String]()
-        if (pbHeartbeatFromWorker.getShuffleKeysCount > 0) {
-          shuffleKeys.addAll(pbHeartbeatFromWorker.getShuffleKeysList)
+        val shuffleKeys = new util.HashMap[String, java.lang.Long]()
+        if (!pbHeartbeatFromWorker.getShuffleDiskUsageMap.isEmpty) {
+          shuffleKeys.putAll(pbHeartbeatFromWorker.getShuffleDiskUsageMap)
         }
         HeartbeatFromWorker(pbHeartbeatFromWorker.getHost, pbHeartbeatFromWorker.getRpcPort,
           pbHeartbeatFromWorker.getPushPort, pbHeartbeatFromWorker.getFetchPort,
diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java
index c8f2a669..e022100d 100644
--- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java
+++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/AbstractMetaManager.java
@@ -18,10 +18,7 @@
 package com.aliyun.emr.rss.service.deploy.master.clustermeta;
 
 import java.io.*;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Optional;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 
 import io.netty.util.internal.ConcurrentSet;
@@ -47,6 +44,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
   public final ConcurrentSet<WorkerInfo> blacklist = new ConcurrentSet<>();
   // workerLost events
   public final ConcurrentSet<WorkerInfo> workerLostEvents = new ConcurrentSet<>();
+  public final Map<WorkerInfo, Map<String, Long>> appDiskUsageDetails = new ConcurrentHashMap<>();
 
   protected RpcEnv rpcEnv;
   protected RssConf conf;
@@ -128,10 +126,11 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
     // delete from blacklist
     blacklist.remove(worker);
     workerLostEvents.remove(worker);
+    appDiskUsageDetails.remove(worker);
   }
 
   public void updateWorkerHeartBeatMeta(String host, int rpcPort, int pushPort, int fetchPort,
-    int replicatePort, int numSlots, long time) {
+      int replicatePort, int numSlots, long time, Map<String,Long> shuffleDiskUsage) {
     WorkerInfo worker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, numSlots,
       null);
     synchronized (workers) {
@@ -140,6 +139,7 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
         info.lastHeartbeat_$eq(time);
         info.setNumSlots(numSlots);
       });
+      appDiskUsageDetails.put(worker,shuffleDiskUsage);
     }
     if (numSlots == 0 && !blacklist.contains(worker)) {
       LOG.warn("Worker: {} num total slots is 0, add to blacklist", worker.toString());
@@ -287,4 +287,8 @@ public abstract class AbstractMetaManager implements IMetadataHandler {
       this.blacklist.addAll(failedWorkers);
     }
   }
+
+  public Map<WorkerInfo,Map<String,Long>> getAppDiskUsageDetailsSnapShot(){
+    return new HashMap<>(this.appDiskUsageDetails);
+  }
 }
diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java
index 4f4e5cf6..d6cdd175 100644
--- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java
+++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/IMetadataHandler.java
@@ -42,7 +42,8 @@ public interface IMetadataHandler {
     String requestId);
 
   void handleWorkerHeartBeat(String host, int rpcPort, int pushPort, int fetchPort,
-    int replicatePort, int numSlots, long time, String requestId);
+    int replicatePort, int numSlots, long time,
+    Map<String, Long> shuffleDiskUsage, String requestId);
 
   void handleRegisterWorker(String host, int rpcPort, int pushPort, int fetchPort,
     int replicatePort, int numSlots, String requestId);
diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java
index e0e58f2e..10e29a15 100644
--- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java
+++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/SingleMasterMetaManager.java
@@ -78,8 +78,10 @@ public class SingleMasterMetaManager extends AbstractMetaManager {
 
   @Override
   public void handleWorkerHeartBeat(String host, int rpcPort, int pushPort, int fetchPort,
-    int replicatePort, int numSlots, long time, String requestId) {
-    updateWorkerHeartBeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort, numSlots, time);
+      int replicatePort, int numSlots, long time,
+      Map<String, Long> shuffleDiskUsage, String requestId) {
+    updateWorkerHeartBeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort,
+            numSlots, time, shuffleDiskUsage);
   }
 
   @Override
diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
index 6fd97a7b..fb3c0ef9 100644
--- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
+++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java
@@ -169,7 +169,8 @@ public class HAMasterMetaManager extends AbstractMetaManager {
 
   @Override
   public void handleWorkerHeartBeat(String host, int rpcPort, int pushPort, int fetchPort,
-    int replicatePort, int numSlots, long time, String requestId) {
+    int replicatePort, int numSlots, long time, Map<String, Long> shuffleDiskUsage,
+    String requestId) {
     try {
       ratisServer.submitRequest(ResourceRequest.newBuilder()
               .setCmdType(Type.WorkerHeartBeat)
@@ -183,6 +184,7 @@ public class HAMasterMetaManager extends AbstractMetaManager {
                               .setReplicatePort(replicatePort)
                               .setNumSlots(numSlots)
                               .setTime(time)
+                              .putAllShuffleDiskUsage(shuffleDiskUsage)
                               .build())
               .build());
     } catch (ServiceException e) {
diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java
index 3bf772df..fdf42763 100644
--- a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java
+++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/MetaHandler.java
@@ -20,6 +20,7 @@ package com.aliyun.emr.rss.service.deploy.master.clustermeta.ha;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
@@ -156,8 +157,10 @@ public class MetaHandler {
           LOG.debug("Handle worker heartbeat for {} {} {} {} {} {}",
                    host, rpcPort, pushPort, fetchPort, replicatePort, numSlots);
           time = request.getWorkerHeartBeatRequest().getTime();
+          Map<String,Long> diskUsageMap = request
+                  .getWorkerHeartBeatRequest().getShuffleDiskUsageMap();
           metaSystem.updateWorkerHeartBeatMeta(host, rpcPort, pushPort, fetchPort, replicatePort,
-            numSlots, time);
+            numSlots, time, diskUsageMap);
           break;
 
         case RegisterWorker:
diff --git a/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/metrics/AppDiskUsageMetric.scala b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/metrics/AppDiskUsageMetric.scala
new file mode 100644
index 00000000..d7734524
--- /dev/null
+++ b/server-master/src/main/java/com/aliyun/emr/rss/service/deploy/master/metrics/AppDiskUsageMetric.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.aliyun.emr.rss.service.deploy.master.metrics
+
+import java.util
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+import com.aliyun.emr.rss.common.RssConf
+import com.aliyun.emr.rss.common.internal.Logging
+import com.aliyun.emr.rss.common.meta.WorkerInfo
+import com.aliyun.emr.rss.common.util.{ThreadUtils, Utils}
+
+import java.time.LocalDateTime
+import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsScalaMapConverter}
+
+case class AppDiskUsage(var appId: String, var usage: Long) {
+  override def toString: String = s"Application ${appId} used ${Utils.bytesToString(usage)} "
+}
+
+class AppDiskUsageSnapShot(val topItemCount: Int) extends Logging {
+  val topNItems = new Array[AppDiskUsage](topItemCount)
+  val startSnapShotTime = LocalDateTime.now()
+  var endSnapShotTime: LocalDateTime = _
+
+  def commit(): Unit = {
+    endSnapShotTime = LocalDateTime.now()
+  }
+
+  def updateAppDiskUsage(appId: String, usage: Long): Unit = {
+    val dropIndex = topNItems.indexWhere(usage => usage != null && usage.appId == appId)
+    if (dropIndex != -1) {
+      drop(dropIndex)
+    }
+    val insertIndex = findInsertPosition(usage)
+    if (insertIndex != -1) {
+      shift(insertIndex)
+      topNItems(insertIndex) = AppDiskUsage(appId, usage)
+    }
+  }
+
+  def shift(index: Int): Unit = {
+    for (i <- topItemCount - 1 until index by -1) {
+      topNItems(i) = topNItems(i - 1)
+    }
+  }
+
+  def drop(index: Int): Unit = {
+    for (i <- index until topItemCount - 1) {
+      topNItems(i) = topNItems(i + 1)
+    }
+  }
+
+  def findInsertPosition(usage: Long): Int = {
+    if (topNItems(0) == null) {
+      return 0
+    }
+    for (i <- 0 until topItemCount) {
+      if (topNItems(i) == null || topNItems(i).usage < usage) {
+        return i
+      }
+    }
+    -1
+  }
+
+  override def toString = s"Snapshot " +
+    s"start ${startSnapShotTime} end ${endSnapShotTime}" +
+    s" ${topNItems.filter(_ != null).mkString(",")}"
+}
+
+class AppDiskUsageMetric(conf: RssConf) extends Logging {
+  val usageCount = RssConf.metricsAppTopDiskUsageCount(conf)
+  val snapshotCount = RssConf.metricsAppTopDiskUsageWindowSize(conf)
+  val interval = RssConf.metricsAppTopDiskUsageInterval(conf)
+  val snapShots = new Array[AppDiskUsageSnapShot](snapshotCount)
+  val logExecutor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("App_disk_usage_metric_thread")
+  val updateExecutor = ThreadUtils.newDaemonSingleThreadExecutor("App_disk_usage_metric_thread")
+  var currentSnapShot: AtomicReference[AppDiskUsageSnapShot] = new AtomicReference[AppDiskUsageSnapShot]()
+
+  def update(map: java.util.Map[WorkerInfo, java.util.Map[String, java.lang.Long]]): Unit = {
+    updateExecutor.submit(new Runnable {
+      override def run(): Unit = {
+        val aggregatedAppDiskUsage = new util.HashMap[String, Long]()
+        map.values().asScala.foreach(_.asScala.foreach { case (shuffleKey, usage) =>
+          val appId = shuffleKey.split("-")(0)
+          if (aggregatedAppDiskUsage.containsKey(appId)) {
+            aggregatedAppDiskUsage.put(appId, aggregatedAppDiskUsage.get(appId) + usage)
+          } else {
+            aggregatedAppDiskUsage.put(appId, usage)
+          }
+        })
+        if (currentSnapShot.get() != null) {
+          aggregatedAppDiskUsage.asScala.foreach { case (key, usage) =>
+            currentSnapShot.get().updateAppDiskUsage(key, usage)
+          }
+        }
+      }
+    })
+  }
+
+  logExecutor.scheduleAtFixedRate(new Runnable {
+    override def run(): Unit = {
+      if (currentSnapShot.get() != null) {
+        currentSnapShot.get().commit()
+      }
+      currentSnapShot.set(getNewSnapShot())
+      logInfo(s"App Disk Usage Top${usageCount} Report ${summary()}")
+    }
+  }, interval, interval, TimeUnit.SECONDS)
+
+  def getNewSnapShot(): AppDiskUsageSnapShot = {
+    for (i <- snapshotCount - 1 until 0 by -1) {
+      snapShots(i) = snapShots(i - 1)
+    }
+    snapShots(0) = new AppDiskUsageSnapShot(usageCount)
+    snapShots(0)
+  }
+
+  def summary(): String = {
+    val stringBuilder = new StringBuilder()
+    for (i <- 0 until snapshotCount) {
+      if (snapShots(i) != null) {
+        stringBuilder.append(snapShots(i))
+        stringBuilder.append("    \n")
+      }
+    }
+    stringBuilder.toString()
+  }
+}
diff --git a/server-master/src/main/proto/Resource.proto b/server-master/src/main/proto/Resource.proto
index a90a1558..fb348674 100644
--- a/server-master/src/main/proto/Resource.proto
+++ b/server-master/src/main/proto/Resource.proto
@@ -74,6 +74,7 @@ message WorkerHeartBeatRequest {
   required int32 replicatePort = 5;
   required int32 numSlots = 6;
   required int64 time = 7;
+  map<string, int64> shuffleDiskUsage = 8;
 }
 
 message RegisterWorkerRequest {
diff --git a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
index dc2118b7..cb66e497 100644
--- a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
+++ b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/Master.scala
@@ -40,6 +40,7 @@ import com.aliyun.emr.rss.server.common.http.{HttpServer, HttpServerInitializer}
 import com.aliyun.emr.rss.service.deploy.master.clustermeta.SingleMasterMetaManager
 import com.aliyun.emr.rss.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
 import com.aliyun.emr.rss.service.deploy.master.http.HttpRequestHandler
+import com.aliyun.emr.rss.service.deploy.master.metrics.AppDiskUsageMetric
 
 private[deploy] class Master(
     override val rpcEnv: RpcEnv,
@@ -63,6 +64,7 @@ private[deploy] class Master(
   private var checkForWorkerTimeOutTask: ScheduledFuture[_] = _
   private var checkForApplicationTimeOutTask: ScheduledFuture[_] = _
   private val nonEagerHandler = ThreadUtils.newDaemonCachedThreadPool("master-noneager-handler", 64)
+  private val appDiskUsageCollector = new AppDiskUsageMetric(conf)
 
   // Config constants
   private val WorkerTimeoutMs = RssConf.workerTimeoutMs(conf)
@@ -187,10 +189,10 @@ private[deploy] class Master(
       executeWithLeaderChecker(context, handleApplicationLost(context, appId, requestId))
 
     case HeartbeatFromWorker(host, rpcPort, pushPort, fetchPort, replicatePort, numSlots,
-    shuffleKeys, requestId) =>
+    shuffleResourceConsumption, requestId) =>
       logDebug(s"Received heartbeat from worker $host:$rpcPort:$pushPort:$fetchPort.")
       executeWithLeaderChecker(context, handleHeartBeatFromWorker(context, host, rpcPort, pushPort,
-        fetchPort, replicatePort, numSlots, shuffleKeys, requestId))
+        fetchPort, replicatePort, numSlots, shuffleResourceConsumption, requestId))
 
     case GetWorkerInfos =>
       executeWithLeaderChecker(context, handleGetWorkerInfos(context))
@@ -245,7 +247,7 @@ private[deploy] class Master(
       fetchPort: Int,
       replicatePort: Int,
       numSlots: Int,
-      shuffleKeys: util.HashSet[String],
+      shuffleDiskUsage: util.HashMap[String, java.lang.Long],
       requestId: String): Unit = {
     val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort,
       -1, null)
@@ -258,15 +260,16 @@ private[deploy] class Master(
         s"$host:$rpcPort:$pushPort:$fetchPort:$replicatePort.")
     } else {
       statusSystem.handleWorkerHeartBeat(host, rpcPort, pushPort, fetchPort, replicatePort,
-        numSlots, System.currentTimeMillis(), requestId)
+        numSlots, System.currentTimeMillis(), shuffleDiskUsage, requestId)
     }
     val expiredShuffleKeys = new util.HashSet[String]
-    shuffleKeys.asScala.foreach { shuffleKey =>
-      if (!statusSystem.registeredShuffle.contains(shuffleKey)) {
-        logWarning(s"Shuffle $shuffleKey expired on $host:$rpcPort:$pushPort:$fetchPort.")
-        expiredShuffleKeys.add(shuffleKey)
+    shuffleDiskUsage.asScala.foreach { case (key, _) =>
+      if (!statusSystem.registeredShuffle.contains(key)) {
+        logWarning(s"Shuffle ${key} expired on $host:$rpcPort:$pushPort:$fetchPort.")
+        expiredShuffleKeys.add(key)
       }
     }
+    appDiskUsageCollector.update(statusSystem.getAppDiskUsageDetailsSnapShot())
     context.reply(HeartbeatResponse(expiredShuffleKeys, registered))
   }
 
@@ -283,9 +286,7 @@ private[deploy] class Master(
         s" for WorkerLost handler!")
       return
     }
-
     statusSystem.handleWorkerLost(host, rpcPort, pushPort, fetchPort, replicatePort, requestId)
-
     if (context != null) {
       context.reply(WorkerLostResponse(true))
     }
@@ -508,6 +509,10 @@ private[deploy] class Master(
     statusSystem.hostnameSet.asScala.mkString(",")
   }
 
+  def listAppDiskUsageInfos: String = {
+    appDiskUsageCollector.summary()
+  }
+
   private def requestGetWorkerInfos(endpoint: RpcEndpointRef): GetWorkerInfosResponse = {
     try {
       endpoint.askSync[GetWorkerInfosResponse](GetWorkerInfos)
diff --git a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/http/HttpRequestHandler.scala b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/http/HttpRequestHandler.scala
index 01819d3a..33696003 100644
--- a/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/http/HttpRequestHandler.scala
+++ b/server-master/src/main/scala/com/aliyun/emr/rss/service/deploy/master/http/HttpRequestHandler.scala
@@ -68,6 +68,8 @@ class HttpRequestHandler(val master: Master,
         master.getThreadDump
       case "/hostnames" =>
         master.getHostnameList
+      case "/listTopDiskUsedApps" =>
+        master.listAppDiskUsageInfos
       case _ => INVALID
     }
   }
diff --git a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
index 56b31839..83efe495 100644
--- a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
+++ b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java
@@ -247,17 +247,17 @@ public class DefaultMetaSystemSuiteJ {
       NUMSLOTS3, getNewReqeustId());
 
     statusSystem.handleWorkerHeartBeat(HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
-      0, 1, getNewReqeustId());
+      0, 1, new HashMap<>(),getNewReqeustId());
 
     assert statusSystem.blacklist.size()==1;
 
     statusSystem.handleWorkerHeartBeat(HOSTNAME2, RPCPORT2, PUSHPORT2, FETCHPORT2, REPLICATEPORT2,
-      0, 1, getNewReqeustId());
+      0, 1, new HashMap<>(), getNewReqeustId());
 
     assert statusSystem.blacklist.size()==2;
 
     statusSystem.handleWorkerHeartBeat(HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT3,
-      1, 1, getNewReqeustId());
+      1, 1, new HashMap<>(), getNewReqeustId());
 
     assert statusSystem.blacklist.size()==1;
   }
diff --git a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
index 8ee561da..c6673fd2 100644
--- a/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
+++ b/server-master/src/test/java/com/aliyun/emr/rss/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java
@@ -56,6 +56,7 @@ public class RatisMasterStatusSystemSuiteJ {
   protected static RpcEndpointRef mockRpcEndpoint = Mockito.mock(RpcEndpointRef.class);
 
   private static String DEFAULT_SERVICE_ID = "RSS_DEFAULT_SERVICE_ID";
+  private static Map<String,Long> shuffleDiskUsageMap = new HashMap<>();
 
   @BeforeClass
   public static void init() throws IOException, InterruptedException {
@@ -439,7 +440,7 @@ public class RatisMasterStatusSystemSuiteJ {
       NUMSLOTS3, getNewReqeustId());
 
     statusSystem.handleWorkerHeartBeat(HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
-      0, 1, getNewReqeustId());
+      0, 1, shuffleDiskUsageMap, getNewReqeustId());
     Thread.sleep(3000L);
 
     Assert.assertEquals(1, STATUSSYSTEM1.blacklist.size());
@@ -447,7 +448,7 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(1, STATUSSYSTEM3.blacklist.size());
 
     statusSystem.handleWorkerHeartBeat(HOSTNAME2, RPCPORT2, PUSHPORT2, FETCHPORT2, REPLICATEPORT2,
-      0, 1, getNewReqeustId());
+      0, 1, shuffleDiskUsageMap, getNewReqeustId());
     Thread.sleep(3000L);
 
     Assert.assertEquals(2, statusSystem.blacklist.size());
@@ -456,7 +457,7 @@ public class RatisMasterStatusSystemSuiteJ {
     Assert.assertEquals(2, STATUSSYSTEM3.blacklist.size());
 
     statusSystem.handleWorkerHeartBeat(HOSTNAME1, RPCPORT1, PUSHPORT1, FETCHPORT1, REPLICATEPORT1,
-      1, 1, getNewReqeustId());
+      1, 1, shuffleDiskUsageMap, getNewReqeustId());
     Thread.sleep(3000L);
 
     Assert.assertEquals(1, statusSystem.blacklist.size());
diff --git a/server-master/src/test/scala/com/aliyun/emr/rss/service/deploy/master/AppDiskUsageMetricSuite.scala b/server-master/src/test/scala/com/aliyun/emr/rss/service/deploy/master/AppDiskUsageMetricSuite.scala
new file mode 100644
index 00000000..53c6e332
--- /dev/null
+++ b/server-master/src/test/scala/com/aliyun/emr/rss/service/deploy/master/AppDiskUsageMetricSuite.scala
@@ -0,0 +1,112 @@
+package com.aliyun.emr.rss.service.deploy.master
+
+import com.aliyun.emr.rss.common.RssConf
+import com.aliyun.emr.rss.common.internal.Logging
+import com.aliyun.emr.rss.common.meta.WorkerInfo
+import com.aliyun.emr.rss.service.deploy.master.metrics.{AppDiskUsageMetric, AppDiskUsageSnapShot}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.funsuite.AnyFunSuite
+
+import java.util
+import scala.util.Random
+
+class AppDiskUsageMetricSuite extends AnyFunSuite
+  with BeforeAndAfterAll
+  with BeforeAndAfterEach
+  with Logging {
+  val WORKER1 = new WorkerInfo("host1", 111, 112, 113, 114)
+  val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214)
+  val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314)
+
+  test("test snapshot ordering") {
+    val snapShot = new AppDiskUsageSnapShot(50)
+    val rand = new Random()
+    for (i <- 1 to 60) {
+      snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1)
+    }
+    println(snapShot.toString)
+  }
+
+  test("test snapshot ordering with duplicate entries") {
+    val snapShot = new AppDiskUsageSnapShot(50)
+    val rand = new Random()
+    for (i <- 1 to 60) {
+      snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1)
+    }
+    for (i <- 1 to 15) {
+      snapShot.updateAppDiskUsage(s"app-${i}", rand.nextInt(100000000) + 1000000000)
+    }
+
+    println(snapShot.toString)
+  }
+
+  test("test app usage snapshot") {
+    Thread.sleep(5000)
+
+    val conf = new RssConf()
+    conf.set("rss.metrics.app.topDiskUsage.windowSize", "5")
+    conf.set("rss.metrids.app.topDiskUsage.interval", "2s")
+    val usageMetric = new AppDiskUsageMetric(conf)
+
+    val map1 = new util.HashMap[WorkerInfo, util.Map[String, java.lang.Long]]()
+    val worker1Map = new util.HashMap[String, java.lang.Long]()
+    val worker2Map = new util.HashMap[String, java.lang.Long]()
+    val worker3Map = new util.HashMap[String, java.lang.Long]()
+    worker1Map.put("app1-1", 2874371)
+    worker1Map.put("app1-2", 4524)
+    worker1Map.put("app1-3", 43452)
+    worker2Map.put("app2-1", 2134526)
+    worker2Map.put("app2-1", 4526)
+    worker2Map.put("app1-1", 23465463)
+    worker3Map.put("app1-1", 132456)
+    worker3Map.put("app3-1", 43254)
+    worker3Map.put("app4-1", 6535635)
+    map1.put(WORKER1, worker1Map)
+    map1.put(WORKER2, worker2Map)
+    map1.put(WORKER3, worker3Map)
+    usageMetric.update(map1)
+    println(usageMetric.summary())
+    Thread.sleep(2000)
+
+    map1.clear()
+    worker1Map.clear()
+    worker2Map.clear()
+    worker3Map.clear()
+    worker1Map.put("app1-1", 23523450)
+    worker1Map.put("app1-2", 3231453)
+    worker1Map.put("app1-3", 2345645)
+    worker2Map.put("app2-1", 12324143)
+    worker2Map.put("app2-1", 23454)
+    worker2Map.put("app5-1", 234235613)
+    worker3Map.put("app1-1", 1234454)
+    worker3Map.put("app3-1", 43532)
+    worker3Map.put("app4-1", 134345213)
+    map1.put(WORKER1, worker1Map)
+    map1.put(WORKER2, worker2Map)
+    map1.put(WORKER3, worker3Map)
+    usageMetric.update(map1)
+    println(usageMetric.summary())
+    Thread.sleep(2000)
+
+    map1.clear()
+    worker1Map.clear()
+    worker2Map.clear()
+    worker3Map.clear()
+    worker1Map.put("app1-1", 82352345)
+    worker1Map.put("app1-2", 7253423)
+    worker1Map.put("app1-3", 42345645)
+    worker2Map.put("app2-1", 12324143)
+    worker2Map.put("app2-1", 563456)
+    worker2Map.put("app5-1", 2341343267L)
+    worker3Map.put("app1-1", 971234454)
+    worker3Map.put("app3-1", 32443532L)
+    worker3Map.put("app4-1", 8734345213L)
+    map1.put(WORKER1, worker1Map)
+    map1.put(WORKER2, worker2Map)
+    map1.put(WORKER3, worker3Map)
+    usageMetric.update(map1)
+    println(usageMetric.summary())
+    Thread.sleep(2500)
+    println(usageMetric.summary())
+  }
+}
diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala
index 038bdb81..15dea308 100644
--- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala
+++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/LocalStorageManager.scala
@@ -410,7 +410,12 @@ private[worker] final class LocalStorageManager(
     }
   }
 
-  def shuffleKeySet(): util.Set[String] = writers.keySet()
+  def shuffleResourceConsumption: util.Map[String, Long] = {
+    writers.asScala.map { keyedWriters => {
+      keyedWriters._1 -> keyedWriters._2.values().asScala.map(_.getFileLength).sum
+    }
+    }.toMap.asJava
+  }
 
   def cleanupExpiredShuffleKey(expiredShuffleKeys: util.HashSet[String]): Unit = {
     val workingDirs = workingDirsSnapshot()
diff --git a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala
index f28a4ea5..b3a0d3ed 100644
--- a/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala
+++ b/server-worker/src/main/scala/com/aliyun/emr/rss/service/deploy/worker/Worker.scala
@@ -17,7 +17,7 @@
 
 package com.aliyun.emr.rss.service.deploy.worker
 
-import java.util.{HashSet => jHashSet}
+import java.util.{HashMap => jHashMap, HashSet => jHashSet}
 import java.util.concurrent._
 import java.util.concurrent.atomic.AtomicBoolean
 
@@ -186,19 +186,24 @@ private[deploy] class Worker(
   }
 
   def heartBeatToMaster(): Unit = {
-    val shuffleKeys = new jHashSet[String]
-    shuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
-    shuffleKeys.addAll(localStorageManager.shuffleKeySet())
+    val shuffleResourceConsumption = new jHashMap[String, java.lang.Long]
+    partitionLocationInfo.shuffleKeySet.asScala.foreach { shuffleKey =>
+      shuffleResourceConsumption.put(shuffleKey, 0)
+    }
+    localStorageManager.shuffleResourceConsumption.asScala.foreach { resource =>
+      shuffleResourceConsumption.put(resource._1, resource._2)
+    }
+
     val response = rssHARetryClient.askSync[HeartbeatResponse](
       HeartbeatFromWorker(host, rpcPort, pushPort, fetchPort, replicatePort, workerInfo.numSlots,
-        shuffleKeys)
+        shuffleResourceConsumption)
       , classOf[HeartbeatResponse])
     if (response.registered) {
       cleanTaskQueue.put(response.expiredShuffleKeys)
     } else {
       logError("Worker not registered in master, clean all shuffle data and register again.")
       // Clean all shuffle related metadata and data
-      cleanup(shuffleKeys)
+      cleanup(new jHashSet[String](shuffleResourceConsumption.keySet()))
       try {
         registerWithMaster()
       } catch {