You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/03/03 12:21:18 UTC
[incubator-uniffle] branch master updated: [#671] feat(coordinator): Metrics of the number of apps submitted by users (#672)
This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 313ddd69 [#671] feat(coordinator): Metrics of the number of apps submitted by users (#672)
313ddd69 is described below
commit 313ddd69723e0c45cd4ddf271b08f6f94c4a723b
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Fri Mar 3 20:21:12 2023 +0800
[#671] feat(coordinator): Metrics of the number of apps submitted by users (#672)
### What changes were proposed in this pull request?
Add metrics for QuotaManager to record the number of apps run by each user.
### Why are the changes needed?
Fix: #671
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
New uts.
---
.../uniffle/coordinator/ApplicationManager.java | 5 +-
.../apache/uniffle/coordinator/QuotaManager.java | 13 +++++
.../coordinator/metric/CoordinatorMetrics.java | 4 ++
.../uniffle/coordinator/QuotaManagerTest.java | 60 ++++++++++++++++++++--
4 files changed, 78 insertions(+), 4 deletions(-)
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 5315c8ff..193a28ee 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -262,7 +262,7 @@ public class ApplicationManager {
detectStorageScheduler.shutdownNow();
}
- private void statusCheck() {
+ protected void statusCheck() {
List<Map<String, Long>> appAndNums = Lists.newArrayList(currentUserAndApp.values());
Map<String, Long> appIds = Maps.newHashMap();
// The reason for setting an expired uuid here is that there is a scenario where accessCluster succeeds,
@@ -293,6 +293,9 @@ public class ApplicationManager {
}
CoordinatorMetrics.gaugeRunningAppNum.set(appIds.size());
updateRemoteStorageMetrics();
+ if (quotaManager != null) {
+ quotaManager.updateQuotaMetrics();
+ }
} catch (Exception e) {
// the flag is only for test case
hasErrorInStatusCheck = true;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
index c9bb635a..297910f1 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.ThreadUtils;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
/**
* QuotaManager is a manager for resource restriction.
@@ -124,6 +125,7 @@ public class QuotaManager {
return true;
} else {
appAndTimes.put(uuid, System.currentTimeMillis());
+ CoordinatorMetrics.gaugeRunningAppNumToUser.labels(user).inc();
return false;
}
}
@@ -140,6 +142,17 @@ public class QuotaManager {
}
}
+ protected void updateQuotaMetrics() {
+ for (Map.Entry<String, Map<String, Long>> userAndApp : currentUserAndApp.entrySet()) {
+ String user = userAndApp.getKey();
+ try {
+ CoordinatorMetrics.gaugeRunningAppNumToUser.labels(user).set(userAndApp.getValue().size());
+ } catch (Exception e) {
+ LOG.warn("Update user metrics for {} failed ", user, e);
+ }
+ }
+ }
+
@VisibleForTesting
public Map<String, Integer> getDefaultUserApps() {
return defaultUserApps;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
index 02bfd85c..b210ca11 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/metric/CoordinatorMetrics.java
@@ -41,11 +41,14 @@ public class CoordinatorMetrics {
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
+ public static final String APP_NUM_TO_USER = "app_num";
+ public static final String USER_LABEL = "user_name";
public static Gauge gaugeTotalServerNum;
public static Gauge gaugeExcludeServerNum;
public static Gauge gaugeUnhealthyServerNum;
public static Gauge gaugeRunningAppNum;
+ public static Gauge gaugeRunningAppNumToUser;
public static Counter counterTotalAppNum;
public static Counter counterTotalAccessRequest;
public static Counter counterTotalCandidatesDeniedRequest;
@@ -101,6 +104,7 @@ public class CoordinatorMetrics {
gaugeExcludeServerNum = metricsManager.addGauge(EXCLUDE_SERVER_NUM);
gaugeUnhealthyServerNum = metricsManager.addGauge(UNHEALTHY_SERVER_NUM);
gaugeRunningAppNum = metricsManager.addGauge(RUNNING_APP_NUM);
+ gaugeRunningAppNumToUser = metricsManager.addGauge(APP_NUM_TO_USER, USER_LABEL);
counterTotalAppNum = metricsManager.addCounter(TOTAL_APP_NUM);
counterTotalAccessRequest = metricsManager.addCounter(TOTAL_ACCESS_REQUEST);
counterTotalCandidatesDeniedRequest = metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index feb2b21f..0d68c55e 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -25,10 +25,16 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -41,6 +47,16 @@ public class QuotaManagerTest {
Objects.requireNonNull(this.getClass().getClassLoader().getResource(fileName)).getFile();
private static final String fileName = "quotaFile.properties";
+ @BeforeAll
+ public static void setup() {
+ CoordinatorMetrics.register();
+ }
+
+ @AfterAll
+ public static void clear() {
+ CoordinatorMetrics.clear();
+ }
+
@Timeout(value = 10)
@Test
public void testDetectUserResource() {
@@ -67,7 +83,6 @@ public class QuotaManagerTest {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
Lists.newArrayList("org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker"));
ApplicationManager applicationManager = new ApplicationManager(conf);
- Thread.sleep(500);
// it didn't detectUserResource because `org.apache.unifle.coordinator.AccessQuotaChecker` is not configured
assertNull(applicationManager.getQuotaManager());
}
@@ -75,8 +90,7 @@ public class QuotaManagerTest {
@Test
public void testCheckQuota() throws Exception {
CoordinatorConf conf = new CoordinatorConf();
- conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
- quotaFile);
+ conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
final ApplicationManager applicationManager = new ApplicationManager(conf);
final AtomicInteger uuid = new AtomicInteger();
Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
@@ -98,4 +112,44 @@ public class QuotaManagerTest {
assertTrue(icCheck);
assertEquals(applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(), 5);
}
+
+ @Test
+ public void testCheckQuotaMetrics() {
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH, quotaFile);
+ conf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 1500);
+ conf.setInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM, 2);
+ final ApplicationManager applicationManager = new ApplicationManager(conf);
+ final AtomicInteger uuid = new AtomicInteger();
+ final int i1 = uuid.incrementAndGet();
+ final int i2 = uuid.incrementAndGet();
+ final int i3 = uuid.incrementAndGet();
+ final int i4 = uuid.incrementAndGet();
+ Map<String, Long> uuidAndTime = new ConcurrentHashMap<>();
+ uuidAndTime.put(String.valueOf(i1), System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(i2), System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(i3), System.currentTimeMillis());
+ uuidAndTime.put(String.valueOf(i4), System.currentTimeMillis());
+ final boolean icCheck = applicationManager.getQuotaManager()
+ .checkQuota("user4", String.valueOf(i1));
+ final boolean icCheck2 = applicationManager.getQuotaManager()
+ .checkQuota("user4", String.valueOf(i2));
+ final boolean icCheck3 = applicationManager.getQuotaManager()
+ .checkQuota("user4", String.valueOf(i3));
+ final boolean icCheck4 = applicationManager.getQuotaManager()
+ .checkQuota("user3", String.valueOf(i4));
+ assertFalse(icCheck);
+ assertFalse(icCheck2);
+ // The default number of tasks submitted is 2, and the third will be rejected
+ assertTrue(icCheck3);
+ assertEquals(applicationManager.getQuotaManager().getCurrentUserAndApp().get("user4").size(), 2);
+ assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get(), 2);
+ assertEquals(CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get(), 1);
+ await().atMost(2, TimeUnit.SECONDS).until(() -> {
+ applicationManager.statusCheck();
+ // If the number of apps corresponding to this user is 0, remove this user
+ return CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user4").get() == 0
+ && CoordinatorMetrics.gaugeRunningAppNumToUser.labels("user3").get() == 0;
+ });
+ }
}