You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/03/03 12:21:18 UTC

[incubator-uniffle] branch master updated: [#671] feat(coordinator): Metrics of the number of apps submitted by users (#672)

This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 313ddd69 [#671] feat(coordinator): Metrics of the number of apps submitted by users (#672)
313ddd69 is described below

commit 313ddd69723e0c45cd4ddf271b08f6f94c4a723b
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Fri Mar 3 20:21:12 2023 +0800

    [#671] feat(coordinator): Metrics of the number of apps submitted by users (#672)
    
    ### What changes were proposed in this pull request?
    Add metrics for QuotaManager to record the number of apps run by each user.
    
    ### Why are the changes needed?
    Fix: #671
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    New uts.
---
 .../uniffle/coordinator/ApplicationManager.java    |  5 +-
 .../apache/uniffle/coordinator/QuotaManager.java   | 13 +++++
 .../coordinator/metric/CoordinatorMetrics.java     |  4 ++
 .../uniffle/coordinator/QuotaManagerTest.java      | 60 ++++++++++++++++++++--
 4 files changed, 78 insertions(+), 4 deletions(-)

diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 5315c8ff..193a28ee 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -262,7 +262,7 @@ public class ApplicationManager {
     detectStorageScheduler.shutdownNow();
   }
 
-  private void statusCheck() {
+  protected void statusCheck() {
     List<Map<String, Long>> appAndNums = Lists.newArrayList(currentUserAndApp.values());
     Map<String, Long> appIds = Maps.newHashMap();
     // The reason for setting an expired uuid here is that there is a scenario where accessCluster succeeds,
@@ -293,6 +293,9 @@ public class ApplicationManager {
       }
       CoordinatorMetrics.gaugeRunningAppNum.set(appIds.size());
       updateRemoteStorageMetrics();
+      if (quotaManager != null) {
+        quotaManager.updateQuotaMetrics();
+      }
     } catch (Exception e) {
       // the flag is only for test case
       hasErrorInStatusCheck = true;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
index c9bb635a..297910f1 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 
 /**
  * QuotaManager is a manager for resource restriction.
@@ -124,6 +125,7 @@ public class QuotaManager {
         return true;
       } else {
         appAndTimes.put(uuid, System.currentTimeMillis());
+        CoordinatorMetrics.gaugeRunningAppNumToUser.labels(user).inc();
         return false;
       }
     }
@@ -140,6 +142,17 @@ public class QuotaManager {
     }
   }
 
+  protected void updateQuotaMetrics() {
+    for (Map.Entry<String, Map<String, Long>> userAndApp : currentUserAndApp.entrySet()) {
+      String user = userAndApp.getKey();
+      try {
+        CoordinatorMetrics.gaugeRunningAppNumToUser.labels(user).set(userAndApp.getValue().size());
+      } catch (Exception e) {
+        LOG.warn("Update user metrics for {} failed ", user, e);
+      }
+    }
+  }
+
   @VisibleForTesting
   public Map<String, Integer> getDefaultUserApps() {
     return defaultUserApps;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index 02bfd85c..b210ca11 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -41,11 +41,14 @@ public class CoordinatorMetrics {
   private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
   private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
   public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
+  public static final String APP_NUM_TO_USER = "app_num";
+  public static final String USER_LABEL = "user_name";
 
   public static Gauge gaugeTotalServerNum;
   public static Gauge gaugeExcludeServerNum;
   public static Gauge gaugeUnhealthyServerNum;
   public static Gauge gaugeRunningAppNum;
+  public static Gauge gaugeRunningAppNumToUser;
   public static Counter counterTotalAppNum;
   public static Counter counterTotalAccessRequest;
   public static Counter counterTotalCandidatesDeniedRequest;
@@ -101,6 +104,7 @@ public class CoordinatorMetrics {
     gaugeExcludeServerNum = metricsManager.addGauge(EXCLUDE_SERVER_NUM);
     gaugeUnhealthyServerNum = metricsManager.addGauge(UNHEALTHY_SERVER_NUM);
     gaugeRunningAppNum = metricsManager.addGauge(RUNNING_APP_NUM);
+    gaugeRunningAppNumToUser = metricsManager.addGauge(APP_NUM_TO_USER, USER_LABEL);
     counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
     counterTotalAccessRequest = metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
     counterTotalCandidatesDeniedRequest = metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index feb2b21f..0d68c55e 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -25,10 +25,16 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
+import static org.awaitility.Awaitility.await;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -41,6 +47,16 @@ public class QuotaManagerTest {
       Objects.requireNonNull(this.getClass().getClassLoader().getResource(fileName)).getFile();
   private static final String fileName = "quotaFile.properties";
 
+  @BeforeAll
+  public static void setup() {
+    CoordinatorMetrics.register();
+  }
+
+  @AfterAll
+  public static void clear() {
+    CoordinatorMetrics.clear();
+  }
+
   @Timeout(value = 10)
   @Test
   public void testDetectUserResource() {
@@ -67,7 +83,6 @@ public class QuotaManagerTest {
     conf.set(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
         Lists.newArrayList("org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
     ApplicationManager applicationManager = new ApplicationManager(conf);
-    Thread.sleep(500);
     // it didn't detectUserResource because `org.apache.unifle.coordinator.AccessQuotaChecker` is not configured
     assertNull(applicationManager.getQuotaManager());
   }
@@ -75,8 +90,7 @@ public class QuotaManagerTest {
   @Test
   public void testCheckQuota() throws Exception {
     CoordinatorConf conf = new CoordinatorConf();
-    conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
-        quotaFile);
+    conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
     final ApplicationManager applicationManager = new ApplicationManager(conf);
     final AtomicInteger uuid = new AtomicInteger();
     Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
@@ -98,4 +112,44 @@ public class QuotaManagerTest {
     assertTrue(icCheck);
     assertEquals(applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(), 5);
   }
+
+  @Test
+  public void testCheckQuotaMetrics() {
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
+    conf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 1500);
+    conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 2);
+    final ApplicationManager applicationManager = new ApplicationManager(conf);
+    final AtomicInteger uuid = new AtomicInteger();
+    final int i1 = uuid.incrementAndGet();
+    final int i2 = uuid.incrementAndGet();
+    final int i3 = uuid.incrementAndGet();
+    final int i4 = uuid.incrementAndGet();
+    Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+    uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+    uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
+    uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
+    uuidAndTime.put(String.valueOf(i4), System.currentTimeMillis());
+    final boolean icCheck = applicationManager.getQuotaManager()
+        .checkQuota("user4", String.valueOf(i1));
+    final boolean icCheck2 = applicationManager.getQuotaManager()
+        .checkQuota("user4", String.valueOf(i2));
+    final boolean icCheck3 = applicationManager.getQuotaManager()
+        .checkQuota("user4", String.valueOf(i3));
+    final boolean icCheck4 = applicationManager.getQuotaManager()
+        .checkQuota("user3", String.valueOf(i4));
+    assertFalse(icCheck);
+    assertFalse(icCheck2);
+    // The default number of tasks submitted is 2, and the third will be rejected
+    assertTrue(icCheck3);
+    assertEquals(applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(), 2);
+    assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get(), 2);
+    assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get(), 1);
+    await().atMost(2, TimeUnit.SECONDS).until(() -> {
+      applicationManager.statusCheck();
+      // If the number of apps corresponding to this user is 0, remove this user
+      return CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get() == 0
+          && CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get() == 0;
+    });
+  }
 }