You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/26 07:00:38 UTC
[incubator-uniffle] branch master updated: [Improvement] Optimize the use of QuotaManager (#359)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 57a834b0 [Improvement] Optimize the use of QuotaManager (#359)
57a834b0 is described below
commit 57a834b083d9dae291091e4f2359d060ba30abe0
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Sat Nov 26 15:00:33 2022 +0800
[Improvement] Optimize the use of QuotaManager (#359)
### What changes were proposed in this pull request?
1. Optimize the use of `QuotaManager`, use `QuotaManager` when checking the number of apps, and decouple with `ApplicationManager`.
2. For #356 The start of `QuotaManager` is controlled by the configuration of the checker.
### Why are the changes needed?
More reasonable.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added new ut.
---
.../apache/uniffle/coordinator/AccessManager.java | 10 +++----
.../uniffle/coordinator/AccessQuotaChecker.java | 8 ++---
.../uniffle/coordinator/ApplicationManager.java | 25 +++++++++-------
.../uniffle/coordinator/CoordinatorServer.java | 4 +--
.../apache/uniffle/coordinator/QuotaManager.java | 34 ++++++++++++++--------
.../coordinator/AccessCandidatesCheckerTest.java | 7 +++--
.../coordinator/AccessClusterLoadCheckerTest.java | 6 ++--
.../uniffle/coordinator/AccessManagerTest.java | 13 +++++----
.../coordinator/AccessQuotaCheckerTest.java | 7 +++--
.../uniffle/coordinator/QuotaManagerTest.java | 30 ++++++++++++++-----
.../test/AccessCandidatesCheckerHdfsTest.java | 7 +++--
11 files changed, 94 insertions(+), 57 deletions(-)
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index 04fddc36..f9a4a93f 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -35,19 +35,19 @@ public class AccessManager {
private final CoordinatorConf coordinatorConf;
private final ClusterManager clusterManager;
- private final ApplicationManager applicationManager;
+ private final QuotaManager quotaManager;
private final Configuration hadoopConf;
private List<AccessChecker> accessCheckers = Lists.newArrayList();
public AccessManager(
CoordinatorConf conf,
ClusterManager clusterManager,
- ApplicationManager applicationManager,
+ QuotaManager quotaManager,
Configuration hadoopConf) throws Exception {
this.coordinatorConf = conf;
this.clusterManager = clusterManager;
this.hadoopConf = hadoopConf;
- this.applicationManager = applicationManager;
+ this.quotaManager = quotaManager;
init();
}
@@ -94,8 +94,8 @@ public class AccessManager {
return hadoopConf;
}
- public ApplicationManager getApplicationManager() {
- return applicationManager;
+ public QuotaManager getQuotaManager() {
+ return quotaManager;
}
public void close() throws IOException {
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
index 91b9ceab..edaa6031 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessQuotaChecker.java
@@ -34,7 +34,7 @@ import org.apache.uniffle.common.util.RssUtils;
public class AccessQuotaChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessQuotaChecker.class);
- private final ApplicationManager applicationManager;
+ private final QuotaManager quotaManager;
private final CoordinatorConf conf;
private static final LongAdder COUNTER = new LongAdder();
private final String hostIp;
@@ -42,7 +42,7 @@ public class AccessQuotaChecker extends AbstractAccessChecker {
public AccessQuotaChecker(AccessManager accessManager) throws Exception {
super(accessManager);
conf = accessManager.getCoordinatorConf();
- applicationManager = accessManager.getApplicationManager();
+ quotaManager = accessManager.getQuotaManager();
hostIp = RssUtils.getHostIp();
}
@@ -53,9 +53,9 @@ public class AccessQuotaChecker extends AbstractAccessChecker {
final String user = accessInfo.getUser();
// low version client user attribute is an empty string
if (!"".equals(user)) {
- Map<String, Map<String, Long>> currentUserApps = applicationManager.getCurrentUserApps();
+ Map<String, Map<String, Long>> currentUserApps = quotaManager.getCurrentUserAndApp();
Map<String, Long> appAndTimes = currentUserApps.computeIfAbsent(user, x -> Maps.newConcurrentMap());
- Integer defaultAppNum = applicationManager.getDefaultUserApps().getOrDefault(user,
+ Integer defaultAppNum = quotaManager.getDefaultUserApps().getOrDefault(user,
conf.getInteger(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
int currentAppNum = appAndTimes.size();
if (currentAppNum >= defaultAppNum) {
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 32c87eba..19c88ac2 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -54,9 +54,9 @@ public class ApplicationManager {
private final Map<String, RankValue> remoteStoragePathRankValue;
private final Map<String, String> remoteStorageToHost = Maps.newConcurrentMap();
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
- private final Map<String, Map<String, Long>> currentUserAndApp;
- private final Map<String, String> appIdToUser;
- private final Map<String, Integer> defaultUserApps;
+ private Map<String, Map<String, Long>> currentUserAndApp = Maps.newConcurrentMap();
+ private Map<String, String> appIdToUser = Maps.newConcurrentMap();
+ private QuotaManager quotaManager;
// it's only for test case to check if status check has problem
private boolean hasErrorInStatusCheck = false;
@@ -75,10 +75,15 @@ public class ApplicationManager {
throw new UnsupportedOperationException("Unsupported selected storage strategy.");
}
expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
- QuotaManager quotaManager = new QuotaManager(conf);
- this.currentUserAndApp = quotaManager.getCurrentUserAndApp();
- this.appIdToUser = quotaManager.getAppIdToUser();
- this.defaultUserApps = quotaManager.getDefaultUserApps();
+ String quotaCheckerClass = AccessQuotaChecker.class.getCanonicalName();
+ for (String checker : conf.get(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS)) {
+ if (quotaCheckerClass.equals(checker.trim())) {
+ this.quotaManager = new QuotaManager(conf);
+ this.currentUserAndApp = quotaManager.getCurrentUserAndApp();
+ this.appIdToUser = quotaManager.getAppIdToUser();
+ break;
+ }
+ }
// the thread for checking application status
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("ApplicationManager-%d"));
@@ -318,11 +323,11 @@ public class ApplicationManager {
}
public Map<String, Integer> getDefaultUserApps() {
- return defaultUserApps;
+ return quotaManager.getDefaultUserApps();
}
- public Map<String, Map<String, Long>> getCurrentUserApps() {
- return currentUserAndApp;
+ public QuotaManager getQuotaManager() {
+ return quotaManager;
}
public enum StrategyName {
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 6a3a0166..4314d625 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -143,8 +143,8 @@ public class CoordinatorServer {
AssignmentStrategyFactory assignmentStrategyFactory =
new AssignmentStrategyFactory(coordinatorConf, clusterManager);
this.assignmentStrategy = assignmentStrategyFactory.getAssignmentStrategy();
- this.accessManager = new AccessManager(coordinatorConf, clusterManager, applicationManager, hadoopConf);
-
+ this.accessManager = new AccessManager(coordinatorConf, clusterManager,
+ applicationManager.getQuotaManager(), hadoopConf);
CoordinatorFactory coordinatorFactory = new CoordinatorFactory(this);
server = coordinatorFactory.getServer();
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
index 2d9c9fce..266aff40 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/QuotaManager.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -53,22 +54,29 @@ public class QuotaManager {
private final Map<String, Integer> defaultUserApps = Maps.newConcurrentMap();
public QuotaManager(CoordinatorConf conf) {
- this.quotaFilePath = conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);;
- final Long updateTime = conf.get(CoordinatorConf.COORDINATOR_QUOTA_UPDATE_INTERVAL);
- try {
- hadoopFileSystem = HadoopFilesystemProvider.getFilesystem(new Path(quotaFilePath), new Configuration());
- } catch (Exception e) {
- LOG.error("Cannot init remoteFS on path : " + quotaFilePath, e);
+ this.quotaFilePath = conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH);
+ if (quotaFilePath == null) {
+ LOG.warn("{} is not configured, each user will use the default quota : {}",
+ CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH.key(),
+ conf.get(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_APP_NUM));
+ } else {
+ final Long updateTime = conf.get(CoordinatorConf.COORDINATOR_QUOTA_UPDATE_INTERVAL);
+ try {
+ hadoopFileSystem = HadoopFilesystemProvider.getFilesystem(new Path(quotaFilePath), new Configuration());
+ } catch (Exception e) {
+ LOG.error("Cannot init remoteFS on path : " + quotaFilePath, e);
+ }
+ // Threads that update the number of submitted applications
+ ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("UpdateDefaultApp-%d"));
+ scheduledExecutorService.scheduleAtFixedRate(
+ this::detectUserResource, 0, updateTime / 2, TimeUnit.MILLISECONDS);
+ LOG.info("QuotaManager initialized successfully.");
}
- // Threads that update the number of submitted applications
- ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.getThreadFactory("UpdateDefaultApp-%d"));
- scheduledExecutorService.scheduleAtFixedRate(
- this::detectUserResource, 0, updateTime / 2, TimeUnit.MILLISECONDS);
}
public void detectUserResource() {
- if (quotaFilePath != null && hadoopFileSystem != null) {
+ if (hadoopFileSystem != null) {
try {
Path hadoopPath = new Path(quotaFilePath);
FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
@@ -76,6 +84,7 @@ public class QuotaManager {
long latestModificationTime = fileStatus.getModificationTime();
if (quotaFileLastModify.get() != latestModificationTime) {
parseQuotaFile(hadoopFileSystem.open(hadoopPath));
+ LOG.warn("We have updated the file {}.", hadoopPath);
quotaFileLastModify.set(latestModificationTime);
}
}
@@ -101,6 +110,7 @@ public class QuotaManager {
}
}
+ @VisibleForTesting
public Map<String, Integer> getDefaultUserApps() {
return defaultUserApps;
}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
index b3a6b30a..c8e80883 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessCandidatesCheckerTest.java
@@ -62,7 +62,7 @@ public class AccessCandidatesCheckerTest {
// file load checking at startup
Exception expectedException = null;
try {
- new AccessManager(conf, null, applicationManager, new Configuration());
+ new AccessManager(conf, null, applicationManager.getQuotaManager(), new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -72,7 +72,7 @@ public class AccessCandidatesCheckerTest {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, cfgFile.toURI().toString());
expectedException = null;
try {
- new AccessManager(conf, null, applicationManager, new Configuration());
+ new AccessManager(conf, null, applicationManager.getQuotaManager(), new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -88,7 +88,8 @@ public class AccessCandidatesCheckerTest {
printWriter.println("2 ");
printWriter.flush();
printWriter.close();
- AccessManager accessManager = new AccessManager(conf, null, applicationManager, new Configuration());
+ AccessManager accessManager = new AccessManager(conf, null, applicationManager.getQuotaManager(),
+ new Configuration());
AccessCandidatesChecker checker = (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
sleep(1200);
assertEquals(Sets.newHashSet("2", "9527", "135"), checker.getCandidates().get());
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
index f932c8f5..b916d575 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
@@ -87,7 +87,8 @@ public class AccessClusterLoadCheckerTest {
conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
ApplicationManager applicationManager = new ApplicationManager(conf);
- AccessManager accessManager = new AccessManager(conf, clusterManager, applicationManager, new Configuration());
+ AccessManager accessManager = new AccessManager(conf, clusterManager,
+ applicationManager.getQuotaManager(), new Configuration());
AccessClusterLoadChecker accessClusterLoadChecker =
(AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
@@ -151,7 +152,8 @@ public class AccessClusterLoadCheckerTest {
conf.setString(COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessClusterLoadChecker");
ApplicationManager applicationManager = new ApplicationManager(conf);
- AccessManager accessManager = new AccessManager(conf, clusterManager, applicationManager, new Configuration());
+ AccessManager accessManager = new AccessManager(conf, clusterManager,
+ applicationManager.getQuotaManager(), new Configuration());
AccessClusterLoadChecker accessClusterLoadChecker =
(AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
when(clusterManager.getServerList(any())).thenReturn(serverNodeList);
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
index f519bcf7..8870d4fa 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessManagerTest.java
@@ -50,7 +50,7 @@ public class AccessManagerTest {
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), " , ");
ApplicationManager applicationManager = new ApplicationManager(conf);
try {
- new AccessManager(conf, null, applicationManager, new Configuration());
+ new AccessManager(conf, null, applicationManager.getQuotaManager(), new Configuration());
} catch (RuntimeException e) {
String expectedMessage = "Empty classes";
assertTrue(e.getMessage().startsWith(expectedMessage));
@@ -58,14 +58,15 @@ public class AccessManagerTest {
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"com.Dummy,org.apache.uniffle.coordinator.AccessManagerTest$MockAccessChecker");
try {
- new AccessManager(conf, null, applicationManager, new Configuration());
+ new AccessManager(conf, null, applicationManager.getQuotaManager(), new Configuration());
} catch (RuntimeException e) {
String expectedMessage = "java.lang.ClassNotFoundException: com.Dummy";
assertTrue(e.getMessage().startsWith(expectedMessage));
}
// test empty checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(), "");
- AccessManager accessManager = new AccessManager(conf, null, applicationManager, new Configuration());
+ AccessManager accessManager = new AccessManager(conf, null,
+ applicationManager.getQuotaManager(), new Configuration());
assertTrue(accessManager.handleAccessRequest(
new AccessInfo(String.valueOf(new Random().nextInt()),
Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), Collections.emptyMap(), "user"))
@@ -74,14 +75,16 @@ public class AccessManagerTest {
// test mock checkers
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,");
- accessManager = new AccessManager(conf, null, applicationManager, new Configuration());
+ accessManager = new AccessManager(conf, null,
+ applicationManager.getQuotaManager(), new Configuration());
assertEquals(1, accessManager.getAccessCheckers().size());
assertTrue(accessManager.handleAccessRequest(new AccessInfo("mock1")).isSuccess());
assertTrue(accessManager.handleAccessRequest(new AccessInfo("mock2")).isSuccess());
conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysTrue,"
+ "org.apache.uniffle.coordinator.AccessManagerTest$MockAccessCheckerAlwaysFalse");
- accessManager = new AccessManager(conf, null, applicationManager, new Configuration());
+ accessManager = new AccessManager(conf, null,
+ applicationManager.getQuotaManager(), new Configuration());
assertEquals(2, accessManager.getAccessCheckers().size());
assertFalse(accessManager.handleAccessRequest(new AccessInfo("mock1")).isSuccess());
accessManager.close();
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
index 723360fd..60dd4f7f 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessQuotaCheckerTest.java
@@ -84,7 +84,8 @@ public class AccessQuotaCheckerTest {
Collections.singletonList("org.apache.uniffle.coordinator.AccessQuotaChecker"));
conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 3);
ApplicationManager applicationManager = new ApplicationManager(conf);
- AccessManager accessManager = new AccessManager(conf, clusterManager, applicationManager, new Configuration());
+ AccessManager accessManager = new AccessManager(conf, clusterManager,
+ applicationManager.getQuotaManager(), new Configuration());
AccessQuotaChecker accessQuotaChecker =
(AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
@@ -108,7 +109,7 @@ public class AccessQuotaCheckerTest {
*/
conf.set(COORDINATOR_QUOTA_DEFAULT_APP_NUM, 0);
applicationManager = new ApplicationManager(conf);
- accessManager = new AccessManager(conf, clusterManager, applicationManager, new Configuration());
+ accessManager = new AccessManager(conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
accessQuotaChecker = (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertFalse(accessQuotaChecker.check(accessInfo).isSuccess());
@@ -123,7 +124,7 @@ public class AccessQuotaCheckerTest {
Arrays.asList("org.apache.uniffle.coordinator.AccessQuotaChecker",
"org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
applicationManager = new ApplicationManager(conf);
- accessManager = new AccessManager(conf, clusterManager, applicationManager, new Configuration());
+ accessManager = new AccessManager(conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
accessQuotaChecker = (AccessQuotaChecker) accessManager.getAccessCheckers().get(0);
final AccessClusterLoadChecker accessClusterLoadChecker =
(AccessClusterLoadChecker) accessManager.getAccessCheckers().get(1);
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
index fe63b6c0..8611d4a6 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/QuotaManagerTest.java
@@ -21,6 +21,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
@@ -31,12 +32,12 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
/**
* QuotaManager is a manager for resource restriction.
*/
public class QuotaManagerTest {
- public QuotaManager quotaManager;
private static final Configuration hdfsConf = new Configuration();
private static MiniDFSCluster cluster;
@TempDir
@@ -48,8 +49,6 @@ public class QuotaManagerTest {
hdfsConf.set("dfs.nameservices", "rss");
hdfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, remotePath.getAbsolutePath());
cluster = (new MiniDFSCluster.Builder(hdfsConf)).build();
- CoordinatorConf conf = new CoordinatorConf();
- quotaManager = new QuotaManager(conf);
}
@AfterAll
@@ -76,14 +75,29 @@ public class QuotaManagerTest {
CoordinatorConf conf = new CoordinatorConf();
conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
quotaFile);
- quotaManager = new QuotaManager(conf);
- quotaManager.detectUserResource();
+ ApplicationManager applicationManager = new ApplicationManager(conf);
+ Thread.sleep(500);
- Integer user1 = quotaManager.getDefaultUserApps().get("user1");
- Integer user2 = quotaManager.getDefaultUserApps().get("user2");
- Integer user3 = quotaManager.getDefaultUserApps().get("user3");
+ Integer user1 = applicationManager.getDefaultUserApps().get("user1");
+ Integer user2 = applicationManager.getDefaultUserApps().get("user2");
+ Integer user3 = applicationManager.getDefaultUserApps().get("user3");
assertEquals(user1, 10);
assertEquals(user2, 20);
assertEquals(user3, 30);
}
+
+ @Test
+ public void testQuotaManagerWithoutAccessQuotaChecker() throws Exception {
+ final String quotaFile =
+ new Path(remotePath.getAbsolutePath()).getFileSystem(hdfsConf).getName() + "/quotaFile.properties";
+ CoordinatorConf conf = new CoordinatorConf();
+ conf.set(CoordinatorConf.COORDINATOR_QUOTA_DEFAULT_PATH,
+ quotaFile);
+ conf.set(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS,
+ Lists.newArrayList("org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+ ApplicationManager applicationManager = new ApplicationManager(conf);
+ Thread.sleep(500);
+ // it didn't detectUserResource because `org.apache.unifle.coordinator.AccessQuotaChecker` is not configured
+ assertNull(applicationManager.getQuotaManager());
+ }
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
index 2ab9e2bd..71d3c9ec 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessCandidatesCheckerHdfsTest.java
@@ -75,7 +75,7 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
// file load checking at startup
Exception expectedException = null;
try {
- new AccessManager(conf, null, applicationManager, new Configuration());
+ new AccessManager(conf, null, applicationManager.getQuotaManager(), new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -85,7 +85,7 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
conf.set(CoordinatorConf.COORDINATOR_ACCESS_CANDIDATES_PATH, candidatesFile);
expectedException = null;
try {
- new AccessManager(conf, null, applicationManager, new Configuration());
+ new AccessManager(conf, null, applicationManager.getQuotaManager(), new Configuration());
} catch (RuntimeException e) {
expectedException = e;
}
@@ -102,7 +102,8 @@ public class AccessCandidatesCheckerHdfsTest extends HdfsTestBase {
printWriter.println("2 ");
printWriter.flush();
printWriter.close();
- AccessManager accessManager = new AccessManager(conf, null, applicationManager, hadoopConf);
+ AccessManager accessManager = new AccessManager(conf, null,
+ applicationManager.getQuotaManager(), hadoopConf);
AccessCandidatesChecker checker = (AccessCandidatesChecker) accessManager.getAccessCheckers().get(0);
// load the config at the beginning
sleep(1200);