You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/04/11 21:42:17 UTC
[gobblin] branch master updated: [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (#3481)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 74c4290e0 [GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (#3481)
74c4290e0 is described below
commit 74c4290e0b0cb46be7b5820d85486978c2939b0d
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Apr 11 14:42:12 2022 -0700
[GOBBLIN-1624] Refactor quota management, fix various bugs in accounting of running … (#3481)
* Refactor quota management, fix various bugs in accounting of running jobs
* Add javadocs
* Address comments, add metric counts to tests
* Address scenario on startup where quota is decreased
* rename onstartup to onInit
---
.../service/modules/orchestration/DagManager.java | 178 +++---------------
.../modules/orchestration/UserQuotaManager.java | 198 +++++++++++++++++++++
.../modules/orchestration/DagManagerTest.java | 153 +++++++++++++++-
.../orchestration/UserQuotaManagerTest.java | 71 ++++++++
4 files changed, 438 insertions(+), 162 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index b69fb2695..2d89591e1 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -31,7 +31,6 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -39,14 +38,12 @@ import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
@@ -138,15 +135,10 @@ public class DagManager extends AbstractIdleService {
private static final long DEFAULT_FAILED_DAG_RETENTION_TIME = 7L;
public static final String FAILED_DAG_POLLING_INTERVAL = FAILED_DAG_STATESTORE_PREFIX + ".retention.pollingIntervalMinutes";
public static final Integer DEFAULT_FAILED_DAG_POLLING_INTERVAL = 60;
- private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + "defaultJobQuota";
- private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
- private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + "perUserQuota";
// Default job start SLA time if configured, measured in minutes. Default is 10 minutes
private static final String JOB_START_SLA_TIME = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME;
private static final String JOB_START_SLA_UNITS = DAG_MANAGER_PREFIX + ConfigurationKeys.GOBBLIN_JOB_START_SLA_TIME_UNIT;
- private static final String QUOTA_SEPERATOR = ":";
-
/**
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
* <ul>
@@ -191,8 +183,6 @@ public class DagManager extends AbstractIdleService {
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
- private final int defaultQuota;
- private final Map<String, Integer> perUserQuota;
private final long failedDagRetentionTime;
private volatile boolean isActive = false;
@@ -213,18 +203,9 @@ public class DagManager extends AbstractIdleService {
} else {
this.eventSubmitter = Optional.absent();
}
-
- this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
TimeUnit jobStartTimeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, JOB_START_SLA_UNITS, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME_UNIT));
this.defaultJobStartSlaTimeMillis = jobStartTimeUnit.toMillis(ConfigUtils.getLong(config, JOB_START_SLA_TIME, ConfigurationKeys.FALLBACK_GOBBLIN_JOB_START_SLA_TIME));
- ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
- for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
- mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
- }
- this.perUserQuota = mapBuilder.build();
-
this.jobStatusRetriever = jobStatusRetriever;
-
TimeUnit timeUnit = TimeUnit.valueOf(ConfigUtils.getString(config, FAILED_DAG_RETENTION_TIME_UNIT, DEFAULT_FAILED_DAG_RETENTION_TIME_UNIT));
this.failedDagRetentionTime = timeUnit.toMillis(ConfigUtils.getLong(config, FAILED_DAG_RETENTION_TIME, DEFAULT_FAILED_DAG_RETENTION_TIME));
}
@@ -374,6 +355,7 @@ public class DagManager extends AbstractIdleService {
ContextAwareMeter allSuccessfulMeter = null;
ContextAwareMeter allFailedMeter = null;
+
if (instrumentationEnabled) {
MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
allSuccessfulMeter = metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
@@ -382,12 +364,22 @@ public class DagManager extends AbstractIdleService {
ServiceMetricNames.FAILED_FLOW_METER));
}
+ UserQuotaManager quotaManager = new UserQuotaManager(config);
+ // Before initializing the DagManagerThreads check which dags are currently running before shutdown
+ for (Dag<JobExecutionPlan> dag: dagStateStore.getDags()) {
+ for (DagNode<JobExecutionPlan> dagNode: dag.getNodes()) {
+ if (DagManagerUtils.getExecutionStatus(dagNode) == RUNNING) {
+ // Add all the currently running Dags to the quota limit per user
+ quotaManager.checkQuota(dagNode, true);
+ }
+ }
+ }
//On startup, the service creates DagManagerThreads that are scheduled at a fixed rate.
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore, failedDagStateStore,
- runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, defaultQuota, perUserQuota, failedDagIds,
- allSuccessfulMeter, allFailedMeter, this.defaultJobStartSlaTimeMillis);
+ runQueue[i], cancelQueue[i], resumeQueue[i], instrumentationEnabled, failedDagIds, allSuccessfulMeter,
+ allFailedMeter, this.defaultJobStartSlaTimeMillis, quotaManager);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
@@ -431,8 +423,6 @@ public class DagManager extends AbstractIdleService {
*/
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
- private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
- private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
private final Set<String> failedDagIds;
private final Map<String, Dag<JobExecutionPlan>> resumingDags = new HashMap<>();
@@ -444,15 +434,13 @@ public class DagManager extends AbstractIdleService {
private final MetricContext metricContext;
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
- private final int defaultQuota;
- private final Map<String, Integer> perUserQuota;
private final AtomicLong orchestrationDelay = new AtomicLong(0);
private static final Map<String, FlowState> flowGauges = Maps.newConcurrentMap();
private final ContextAwareMeter allSuccessfulMeter;
private final ContextAwareMeter allFailedMeter;
private static final Map<String, ContextAwareMeter> groupSuccessfulMeters = Maps.newConcurrentMap();
private static final Map<String, ContextAwareMeter> groupFailureMeters = Maps.newConcurrentMap();
-
+ private final UserQuotaManager quotaManager;
private final JobStatusRetriever jobStatusRetriever;
private final DagStateStore dagStateStore;
private final DagStateStore failedDagStateStore;
@@ -465,8 +453,8 @@ public class DagManager extends AbstractIdleService {
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore, DagStateStore failedDagStateStore,
BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, BlockingQueue<String> resumeQueue,
- boolean instrumentationEnabled, int defaultQuota, Map<String, Integer> perUserQuota, Set<String> failedDagIds,
- ContextAwareMeter allSuccessfulMeter, ContextAwareMeter allFailedMeter, Long defaultJobStartSla) {
+ boolean instrumentationEnabled, Set<String> failedDagIds, ContextAwareMeter allSuccessfulMeter,
+ ContextAwareMeter allFailedMeter, Long defaultJobStartSla, UserQuotaManager quotaManager) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.failedDagStateStore = failedDagStateStore;
@@ -474,11 +462,11 @@ public class DagManager extends AbstractIdleService {
this.queue = queue;
this.cancelQueue = cancelQueue;
this.resumeQueue = resumeQueue;
- this.defaultQuota = defaultQuota;
- this.perUserQuota = perUserQuota;
this.allSuccessfulMeter = allSuccessfulMeter;
this.allFailedMeter = allFailedMeter;
this.defaultJobStartSlaTimeMillis = defaultJobStartSla;
+ this.quotaManager = quotaManager;
+
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
@@ -675,6 +663,7 @@ public class DagManager extends AbstractIdleService {
addJobState(dagId, dagNode);
//Update the running jobs counter.
getRunningJobsCounter(dagNode).inc();
+ getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::inc);
isDagRunning = true;
}
}
@@ -951,7 +940,7 @@ public class DagManager extends AbstractIdleService {
// Run this spec on selected executor
SpecProducer<Spec> producer;
try {
- checkQuota(dagNode);
+ quotaManager.checkQuota(dagNode, false);
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
@@ -990,87 +979,6 @@ public class DagManager extends AbstractIdleService {
}
}
- private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
- String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
- String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
- boolean proxyUserCheck = true;
- int proxyQuotaIncrement;
- Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
- StringBuilder requesterMessage = new StringBuilder();
-
- if (proxyUser != null) {
- proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
- proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
- if (!proxyUserCheck) {
- requesterMessage.append(String.format(
- "Quota exceeded for proxy user %s on executor %s : quota=%s, runningJobs=%d%n",
- proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)));
- }
- }
-
- String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
- boolean requesterCheck = true;
-
- if (serializedRequesters != null) {
- List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
- .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
- for (String requester : uniqueRequesters) {
- int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
- boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
- usersQuotaIncrement.add(requester);
- requesterCheck = requesterCheck && thisRequesterCheck;
- if (!thisRequesterCheck) {
- requesterMessage.append(String.format(
- "Quota exceeded for requester %s on executor %s : quota=%s, runningJobs=%d%n",
- requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)));
- }
- }
- }
-
- // Throw errors for reach quota at the end to avoid inconsistent job counts
- if (!proxyUserCheck || !requesterCheck) {
- // roll back the increased counts in this block
- decrementQuotaUsage(proxyUserToJobCount, proxyUser);
- decrementQuotaUsageForUsers(usersQuotaIncrement);
- throw new IOException(requesterMessage.toString());
- }
- }
-
- /**
- * Increment quota by one for the given map and key.
- * @return a negative number if quota is already reached for this user
- * a positive number if the quota is not reached for this user
- * the absolute value of the number is the used quota before this increment request
- * 0 if quota usage is not changed
- */
- private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
- String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
-
- // Only increment job count for first attempt, since job is considered running between retries
- if (dagNode.getValue().getCurrentAttempts() != 1) {
- return 0;
- }
-
- Integer currentCount;
- do {
- currentCount = quotaMap.get(key);
- } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
-
- if (currentCount == null) {
- currentCount = 0;
- }
-
- if (currentCount >= getQuotaForUser(user)) {
- return -currentCount; // increment must have crossed the quota
- } else {
- return currentCount;
- }
- }
-
- private int getQuotaForUser(String user) {
- return perUserQuota.getOrDefault(user, defaultQuota);
- }
-
/**
* Method that defines the actions to be performed when a job finishes either successfully or with failure.
* This method updates the state of the dag and performs clean up actions as necessary.
@@ -1085,16 +993,13 @@ public class DagManager extends AbstractIdleService {
String jobName = DagManagerUtils.getFullyQualifiedJobName(dagNode);
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
-
- releaseQuota(dagNode);
-
- if (this.metricContext != null) {
+ // Only decrement counters and quota for jobs that actually ran on the executor, not from a GaaS side failure/skip event
+ if (quotaManager.releaseQuota(dagNode) && this.metricContext != null) {
getRunningJobsCounter(dagNode).dec();
getRunningJobsCounterForUser(dagNode).forEach(ContextAwareCounter::dec);
}
switch (jobStatus) {
- // TODO : For now treat canceled as failed, till we introduce failure option - CANCEL
case FAILED:
dag.setMessage("Flow failed because job " + jobName + " failed");
if (DagManagerUtils.getFailureOption(dag) == FailureOption.FINISH_RUNNING) {
@@ -1118,45 +1023,6 @@ public class DagManager extends AbstractIdleService {
}
}
- /**
- * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link DagNode}.
- */
- private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
- String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
- if (proxyUser != null) {
- String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
- }
-
- String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
- if (serializedRequesters != null) {
- try {
- for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
- String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
- decrementQuotaUsage(requesterToJobCount, requesterKey);
- }
- } catch (IOException e) {
- log.error("Failed to release quota for requester list " + serializedRequesters, e);
- }
- }
- }
-
- private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
- Integer currentCount;
- if (user == null) {
- return;
- }
- do {
- currentCount = quotaMap.get(user);
- } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
- }
-
- private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
- for (String requester : requestersToDecreaseCount) {
- decrementQuotaUsage(DagManagerThread.requesterToJobCount, requester);
- }
- }
-
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
new file mode 100644
index 000000000..58b212db2
--- /dev/null
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.google.common.collect.ImmutableMap;
+import com.typesafe.config.Config;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
+
+
+/**
+ * Manages the statically configured user quotas for both the proxy user in user.to.proxy configuration and the API requester(s)
+ * Is used by the dag manager to ensure that the number of currently running jobs do not exceed the quota, if the quota
+ * is exceeded, then the execution will fail without running on the underlying executor
+ */
+@Slf4j
+public class UserQuotaManager {
+ public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota";
+ public static final String USER_JOB_QUOTA_KEY = DagManager.DAG_MANAGER_PREFIX + "defaultJobQuota";
+ public static final String QUOTA_SEPERATOR = ":";
+ public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
+ private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
+ private final Map<String, Integer> perUserQuota;
+ Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
+ private final int defaultQuota;
+
+ UserQuotaManager(Config config) {
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
+ ImmutableMap.Builder<String, Integer> mapBuilder = ImmutableMap.builder();
+
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
+ mapBuilder.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+ this.perUserQuota = mapBuilder.build();
+ }
+
+ /**
+ * Checks if the dagNode exceeds the statically configured user quota for both the proxy user and requester user
+ * @throws IOException if the quota is exceeded, and logs a statement
+ */
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException {
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ boolean proxyUserCheck = true;
+ int proxyQuotaIncrement;
+ Set<String> usersQuotaIncrement = new HashSet<>(); // holds the users for which quota is increased
+ StringBuilder requesterMessage = new StringBuilder();
+ runningDagIds.put(DagManagerUtils.generateDagId(dagNode), true);
+
+ if (proxyUser != null) {
+ proxyQuotaIncrement = incrementJobCountAndCheckUserQuota(proxyUserToJobCount, proxyUser, dagNode);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since count starts at 0, and is negative if quota is exceeded
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement)+1-getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (serializedRequesters != null) {
+ List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
+ .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckUserQuota(requesterToJobCount, requester, dagNode);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
+ usersQuotaIncrement.add(requester);
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n",
+ requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement)-getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ // Throw errors for reach quota at the end to avoid inconsistent job counts
+ if ((!proxyUserCheck || !requesterCheck) && !onInit) {
+ // roll back the increased counts in this block
+ String userKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+ decrementQuotaUsage(proxyUserToJobCount, userKey);
+ decrementQuotaUsageForUsers(usersQuotaIncrement);
+ runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+ throw new IOException(requesterMessage.toString());
+ }
+ }
+
+ /**
+ * Increment quota by one for the given map and key.
+ * @return a negative number if quota is already reached for this user
+ * a positive number if the quota is not reached for this user
+ * the absolute value of the number is the used quota before this increment request
+ * 0 if quota usage is not changed
+ */
+ private int incrementJobCountAndCheckUserQuota(Map<String, Integer> quotaMap, String user, Dag.DagNode<JobExecutionPlan> dagNode) {
+ String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+
+ // Only increment job count for first attempt, since job is considered running between retries
+ if (dagNode.getValue().getCurrentAttempts() != 1) {
+ return 0;
+ }
+
+ Integer currentCount;
+ // Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user
+ do {
+ currentCount = quotaMap.get(key);
+ } while (currentCount == null ? quotaMap.putIfAbsent(key, 1) != null : !quotaMap.replace(key, currentCount, currentCount + 1));
+
+ if (currentCount == null) {
+ currentCount = 0;
+ }
+
+ if (currentCount >= getQuotaForUser(user)) {
+ return -currentCount; // increment must have crossed the quota
+ } else {
+ return currentCount;
+ }
+ }
+
+ /**
+ * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+ * Returns true if the dag existed in the set of running dags and was removed successfully
+ */
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) {
+ Boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
+ if (val == null) {
+ return false;
+ }
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+ decrementQuotaUsage(proxyUserToJobCount, proxyUserKey);
+ }
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ if (serializedRequesters != null) {
+ try {
+ for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
+ decrementQuotaUsage(requesterToJobCount, requesterKey);
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " + serializedRequesters, e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void decrementQuotaUsage(Map<String, Integer> quotaMap, String user) {
+ Integer currentCount;
+ if (user == null) {
+ return;
+ }
+ do {
+ currentCount = quotaMap.get(user);
+ } while (currentCount != null && currentCount > 0 && !quotaMap.replace(user, currentCount, currentCount - 1));
+ }
+
+ private void decrementQuotaUsageForUsers(Set<String> requestersToDecreaseCount) {
+ for (String requester : requestersToDecreaseCount) {
+ decrementQuotaUsage(requesterToJobCount, requester);
+ }
+ }
+
+ private int getQuotaForUser(String user) {
+ return this.perUserQuota.getOrDefault(user, defaultQuota);
+ }
+
+}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index b072185c8..9daad5fbc 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.gobblin.service.modules.orchestration;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.MetricRegistry;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -29,10 +31,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.metrics.ServiceMetricNames;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -65,6 +69,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerTest {
private final String dagStateStoreDir = "/tmp/dagManagerTest/dagStateStore";
private DagStateStore _dagStateStore;
+ private DagStateStore _failedDagStateStore;
private JobStatusRetriever _jobStatusRetriever;
private DagManager.DagManagerThread _dagManagerThread;
private LinkedBlockingQueue<Dag<JobExecutionPlan>> queue;
@@ -73,8 +78,10 @@ public class DagManagerTest {
private Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag;
private Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs;
private Map<String, Dag<JobExecutionPlan>> dags;
+ private UserQuotaManager _gobblinServiceQuotaManager;
private Set<String> failedDagIds;
private static long START_SLA_DEFAULT = 15 * 60 * 1000;
+ private MetricContext metricContext;
@BeforeClass
public void setUp() throws Exception {
@@ -83,15 +90,18 @@ public class DagManagerTest {
.withValue(FSDagStateStore.DAG_STATESTORE_DIR, ConfigValueFactory.fromAnyRef(this.dagStateStoreDir));
this._dagStateStore = new FSDagStateStore(config, new HashMap<>());
- DagStateStore failedDagStateStore = new InMemoryDagStateStore();
+ this._failedDagStateStore = new InMemoryDagStateStore();
this._jobStatusRetriever = Mockito.mock(JobStatusRetriever.class);
this.queue = new LinkedBlockingQueue<>();
this.cancelQueue = new LinkedBlockingQueue<>();
this.resumeQueue = new LinkedBlockingQueue<>();
- MetricContext metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
- this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, failedDagStateStore, queue, cancelQueue,
- resumeQueue, true, 5, new HashMap<>(), new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
- metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT);
+ this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
+ Config quotaConfig = ConfigFactory.empty()
+ .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1"));
+ this._gobblinServiceQuotaManager = new UserQuotaManager(quotaConfig);
+ this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, _failedDagStateStore, queue, cancelQueue,
+ resumeQueue, true, new HashSet<>(), metricContext.contextAwareMeter("successMeter"),
+ metricContext.contextAwareMeter("failedMeter"), START_SLA_DEFAULT, _gobblinServiceQuotaManager);
Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);
@@ -110,6 +120,19 @@ public class DagManagerTest {
this.failedDagIds = (Set<String>) failedDagIdsField.get(this._dagManagerThread);
}
+ /**
+ * Create a list of dags with only one node each
+ * @return a Dag.
+ */
+ static List<Dag<JobExecutionPlan>> buildDagList(int numDags, String proxyUser) throws URISyntaxException{
+ List<Dag<JobExecutionPlan>> dagList = new ArrayList<>();
+ for (int i = 0; i < numDags; i++) {
+ dagList.add(buildDag(Integer.toString(i), System.currentTimeMillis(), DagManager.FailureOption.FINISH_ALL_POSSIBLE.name(), 1,
+ proxyUser));
+ }
+ return dagList;
+ }
+
/**
* Create a {@link Dag <JobExecutionPlan>}.
* @return a Dag.
@@ -122,6 +145,11 @@ public class DagManagerTest {
static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, int numNodes)
throws URISyntaxException {
+ return buildDag(id, flowExecutionId, flowFailureOption, numNodes, "testUser");
+ }
+
+ static Dag<JobExecutionPlan> buildDag(String id, Long flowExecutionId, String flowFailureOption, int numNodes, String proxyUser)
+ throws URISyntaxException {
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (int i = 0; i < numNodes; i++) {
@@ -132,7 +160,8 @@ public class DagManagerTest {
addPrimitive(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId).
addPrimitive(ConfigurationKeys.JOB_GROUP_KEY, "group" + id).
addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job" + suffix).
- addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption).build();
+ addPrimitive(ConfigurationKeys.FLOW_FAILURE_OPTION, flowFailureOption).
+ addPrimitive(AzkabanProjectConfig.USER_TO_PROXY, proxyUser).build();
if ((i == 1) || (i == 2)) {
jobConfig = jobConfig.withValue(ConfigurationKeys.JOB_DEPENDENCIES, ConfigValueFactory.fromAnyRef("job0"));
} else if (i == 3) {
@@ -757,6 +786,118 @@ public class DagManagerTest {
Assert.assertEquals(this.dagToJobs.size(), 0);
}
+ @Test (dependsOnMethods = "testDagManagerWithBadFlowSLAConfig")
+ public void testDagManagerQuotaExceeded() throws URISyntaxException, IOException {
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(2, "user");
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ Config jobConfig0 = dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
+ Config jobConfig1 = dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+ Iterator<JobStatus> jobStatusIterator0 =
+ getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus("flow1", "group1", Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group1", String.valueOf(ExecutionStatus.FAILED));
+ // Cleanup the running job that is scheduled normally
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"), Mockito.eq("group0"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator0)
+ .thenReturn(jobStatusIterator2)
+ .thenReturn(jobStatusIterator3);
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"), Mockito.eq("group1"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator1);
+
+ this._dagManagerThread.run();
+ // dag will not be processed due to exceeding the quota, will log a message and exit out without adding it to dags
+ this.queue.offer(dagList.get(1));
+ this._dagManagerThread.run();
+ SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters();
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
+
+ this._dagManagerThread.run(); // cleanup
+ }
+
+ @Test (dependsOnMethods = "testDagManagerQuotaExceeded")
+ public void testQuotaDecrement() throws URISyntaxException, IOException {
+
+ List<Dag<JobExecutionPlan>> dagList = buildDagList(3, "user");
+ //Add a dag to the queue of dags
+ this.queue.offer(dagList.get(0));
+ this.queue.offer(dagList.get(1));
+ Config jobConfig0 = dagList.get(0).getNodes().get(0).getValue().getJobSpec().getConfig();
+ Config jobConfig1 = dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+ Config jobConfig2 = dagList.get(1).getNodes().get(0).getValue().getJobSpec().getConfig();
+
+ Iterator<JobStatus> jobStatusIterator0 =
+ getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator1 =
+ getMockJobStatus("flow1", "group1", Long.valueOf(jobConfig1.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group1", String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIterator2 =
+ getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.RUNNING));
+ Iterator<JobStatus> jobStatusIterator3 =
+ getMockJobStatus("flow2", "group2", Long.valueOf(jobConfig2.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group2", String.valueOf(ExecutionStatus.FAILED));
+ Iterator<JobStatus> jobStatusIterator4 =
+ getMockJobStatus("flow0", "group0", Long.valueOf(jobConfig0.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)),
+ "job0", "group0", String.valueOf(ExecutionStatus.COMPLETE));
+
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow0"), Mockito.eq("group0"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator0)
+ .thenReturn(jobStatusIterator2)
+ .thenReturn(jobStatusIterator4);
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow1"), Mockito.eq("group1"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator1);
+ Mockito.when(_jobStatusRetriever
+ .getJobStatusesForFlowExecution(Mockito.eq("flow2"), Mockito.eq("group2"), Mockito.anyLong(),
+ Mockito.anyString(), Mockito.anyString()))
+ .thenReturn(jobStatusIterator3);
+
+ this._dagManagerThread.run();
+
+ SortedMap<String, Counter> allCounters = metricContext.getParent().get().getCounters();
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
+ // Test case where a job that exceeded a quota would cause a double decrement after fixing the proxy user name, allowing for more jobs to run
+ this.queue.offer(dagList.get(2));
+ this._dagManagerThread.run();
+ // Assert that running dag metrics are only counted once
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 1);
+
+ this._dagManagerThread.run(); // cleanup
+ Assert.assertEquals(allCounters.get(MetricRegistry.name(
+ ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.SERVICE_USERS,
+ "user")).getCount(), 0);
+
+ }
+
@AfterClass
public void cleanUp() throws Exception {
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
new file mode 100644
index 000000000..2ac0acc79
--- /dev/null
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.service.modules.orchestration;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import java.io.IOException;
+import java.util.List;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class UserQuotaManagerTest {
+
+ UserQuotaManager _quotaManager;
+
+ @BeforeClass
+ public void setUp() {
+ Config quotaConfig = ConfigFactory.empty()
+ .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user:1"))
+ .withValue(UserQuotaManager.PER_USER_QUOTA, ConfigValueFactory.fromAnyRef("user2:1"));
+ this._quotaManager = new UserQuotaManager(quotaConfig);
+ }
+
+ // Tests that if exceeding the quota on startup, do not throw an exception and do not decrement the counter
+ @Test
+ public void testExceedsQuotaOnStartup() throws Exception {
+ List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user");
+ // Ensure that the current attempt is 1, normally done by DagManager
+ dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+ dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+ this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true);
+ // Should not be throwing the exception
+ this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true);
+
+ // TODO: add verification when adding a public method for getting the current count and quota per user
+ }
+
+ @Test
+ public void testExceedsQuotaThrowsException() throws Exception {
+ List<Dag<JobExecutionPlan>> dags = DagManagerTest.buildDagList(2, "user2");
+
+ // Ensure that the current attempt is 1, normally done by DagManager
+ dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
+ dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
+
+ this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), false);
+ Assert.assertThrows(IOException.class, () -> {
+ this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), false);
+ });
+ }
+}