You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/13 04:26:21 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1073] Add proxy
user and requester quota to GaaS
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new d703b8f [GOBBLIN-1073] Add proxy user and requester quota to GaaS
d703b8f is described below
commit d703b8f5201dd343ea543d52af63eb68e38121f1
Author: Jack Moseley <jm...@linkedin.com>
AuthorDate: Thu Mar 12 21:26:12 2020 -0700
[GOBBLIN-1073] Add proxy user and requester quota to GaaS
Closes #2913 from jack-moseley/gaas-quota
---
.../service/modules/orchestration/DagManager.java | 112 ++++++++++++++++++++-
.../modules/orchestration/DagManagerUtils.java | 14 +++
.../modules/orchestration/DagManagerTest.java | 2 +-
3 files changed, 123 insertions(+), 5 deletions(-)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index fbee417..0187c8a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -123,6 +124,11 @@ public class DagManager extends AbstractIdleService {
private static final String JOB_STATUS_RETRIEVER_CLASS_KEY = JOB_STATUS_RETRIEVER_KEY + ".class";
private static final String DEFAULT_JOB_STATUS_RETRIEVER_CLASS = FsJobStatusRetriever.class.getName();
private static final String DAG_STATESTORE_CLASS_KEY = DAG_MANAGER_PREFIX + "dagStateStoreClass";
+ private static final String USER_JOB_QUOTA_KEY = DAG_MANAGER_PREFIX + "defaultJobQuota";
+ private static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
+ private static final String PER_USER_QUOTA = DAG_MANAGER_PREFIX + "perUserQuota";
+
+ private static final String QUOTA_SEPERATOR = ":";
/**
* Action to be performed on a {@link Dag}, in case of a job failure. Currently, we allow 2 modes:
@@ -165,6 +171,8 @@ public class DagManager extends AbstractIdleService {
private final JobStatusRetriever jobStatusRetriever;
private final Config config;
private final Optional<EventSubmitter> eventSubmitter;
+ private final int defaultQuota;
+ private final Map<String, Integer> perUserQuota;
private volatile boolean isActive = false;
@@ -183,6 +191,12 @@ public class DagManager extends AbstractIdleService {
this.eventSubmitter = Optional.absent();
}
+ this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
+ this.perUserQuota = new HashMap<>();
+ for (String userQuota : ConfigUtils.getStringList(config, PER_USER_QUOTA)) {
+ this.perUserQuota.put(userQuota.split(QUOTA_SEPERATOR)[0], Integer.parseInt(userQuota.split(QUOTA_SEPERATOR)[1]));
+ }
+
try {
this.jobStatusRetriever = createJobStatusRetriever(config);
} catch (ReflectiveOperationException e) {
@@ -328,7 +342,7 @@ public class DagManager extends AbstractIdleService {
this.dagManagerThreads = new DagManagerThread[numThreads];
for (int i = 0; i < numThreads; i++) {
DagManagerThread dagManagerThread = new DagManagerThread(jobStatusRetriever, dagStateStore,
- queue[i], cancelQueue[i], instrumentationEnabled);
+ queue[i], cancelQueue[i], instrumentationEnabled, defaultQuota, perUserQuota);
this.dagManagerThreads[i] = dagManagerThread;
this.scheduledExecutorPool.scheduleAtFixedRate(dagManagerThread, 0, this.pollingInterval, TimeUnit.SECONDS);
}
@@ -362,6 +376,8 @@ public class DagManager extends AbstractIdleService {
*/
public static class DagManagerThread implements Runnable {
private final Map<DagNode<JobExecutionPlan>, Dag<JobExecutionPlan>> jobToDag = new HashMap<>();
+ private static final Map<String, Integer> proxyUserToJobCount = new ConcurrentHashMap<>();
+ private static final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
private final Map<String, Dag<JobExecutionPlan>> dags = new HashMap<>();
// dagToJobs holds a map of dagId to running jobs of that dag
final Map<String, LinkedList<DagNode<JobExecutionPlan>>> dagToJobs = new HashMap<>();
@@ -371,6 +387,8 @@ public class DagManager extends AbstractIdleService {
private final MetricContext metricContext;
private final Optional<EventSubmitter> eventSubmitter;
private final Optional<Timer> jobStatusPolledTimer;
+ private final int defaultQuota;
+ private final Map<String, Integer> perUserQuota;
private JobStatusRetriever jobStatusRetriever;
private DagStateStore dagStateStore;
@@ -381,11 +399,14 @@ public class DagManager extends AbstractIdleService {
* Constructor.
*/
DagManagerThread(JobStatusRetriever jobStatusRetriever, DagStateStore dagStateStore,
- BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, boolean instrumentationEnabled) {
+ BlockingQueue<Dag<JobExecutionPlan>> queue, BlockingQueue<String> cancelQueue, boolean instrumentationEnabled,
+ int defaultQuota, Map<String, Integer> perUserQuota) {
this.jobStatusRetriever = jobStatusRetriever;
this.dagStateStore = dagStateStore;
this.queue = queue;
this.cancelQueue = cancelQueue;
+ this.defaultQuota = defaultQuota;
+ this.perUserQuota = perUserQuota;
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(ConfigFactory.empty()), getClass());
this.eventSubmitter = Optional.of(new EventSubmitter.Builder(this.metricContext, "org.apache.gobblin.service").build());
@@ -706,11 +727,12 @@ public class DagManager extends AbstractIdleService {
JobSpec jobSpec = DagManagerUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
- String specExecutorUri = dagNode.getValue().getSpecExecutor().getUri().toString();
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
// Run this spec on selected executor
SpecProducer producer = null;
try {
+ checkQuota(dagNode);
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
@@ -747,6 +769,59 @@ public class DagManager extends AbstractIdleService {
}
}
+ private void checkQuota(DagNode<JobExecutionPlan> dagNode) throws IOException {
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ boolean proxyUserCheck = true;
+ if (proxyUser != null) {
+ proxyUserCheck = incrementMapAndCheckQuota(proxyUserToJobCount, proxyUser, dagNode);
+ }
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+ String requesterMessage = null;
+ if (serializedRequesters != null) {
+ for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+ requesterCheck &= incrementMapAndCheckQuota(requesterToJobCount, requester.getName(), dagNode);
+ if (!requesterCheck && requesterMessage == null) {
+ requesterMessage = "Quota exceeded for requester " + requester.getName() + " on executor " + specExecutorUri + ": quota="
+ + getQuotaForUser(requester.getName()) + ", runningJobs=" + requesterToJobCount.get(DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode));
+ }
+ }
+ }
+
+ // Throw errors for reach quota at the end to avoid inconsistent job counts
+ if (!proxyUserCheck) {
+ throw new IOException("Quota exceeded for proxy user " + proxyUser + " on executor " + specExecutorUri +
+ ": quota=" + getQuotaForUser(proxyUser) + ", runningJobs=" + proxyUserToJobCount.get(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode)));
+ }
+
+ if (!requesterCheck) {
+ throw new IOException(requesterMessage);
+ }
+ }
+
+ /**
+ * Increment quota by one for the given map and key.
+ * @return true if quota is not reached for this user or user is whitelisted, false otherwise.
+ */
+ private boolean incrementMapAndCheckQuota(Map<String, Integer> quotaMap, String user, DagNode<JobExecutionPlan> dagNode) {
+ String key = DagManagerUtils.getUserQuotaKey(user, dagNode);
+ int jobCount = quotaMap.getOrDefault(key, 0);
+
+ // Only increment job count for first attempt, since job is considered running between retries
+ if (dagNode.getValue().getCurrentAttempts() == 1) {
+ jobCount++;
+ quotaMap.put(key, jobCount);
+ }
+
+ return jobCount <= getQuotaForUser(user);
+ }
+
+ private int getQuotaForUser(String user) {
+ return perUserQuota.getOrDefault(user, defaultQuota);
+ }
+
/**
* Method that defines the actions to be performed when a job finishes either successfully or with failure.
* This method updates the state of the dag and performs clean up actions as necessary.
@@ -762,6 +837,8 @@ public class DagManager extends AbstractIdleService {
ExecutionStatus jobStatus = DagManagerUtils.getExecutionStatus(dagNode);
log.info("Job {} of Dag {} has finished with status {}", jobName, dagId, jobStatus.name());
+ releaseQuota(dagNode);
+
if (this.metricContext != null) {
getRunningJobsCounter(dagNode).dec();
getRunningJobsCounterForUser(dagNode).forEach(counter -> counter.dec());
@@ -785,6 +862,33 @@ public class DagManager extends AbstractIdleService {
}
}
+ /**
+ * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link DagNode}.
+ */
+ private void releaseQuota(DagNode<JobExecutionPlan> dagNode) {
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+ if (proxyUserToJobCount.containsKey(proxyUserKey) && proxyUserToJobCount.get(proxyUserKey) > 0) {
+ proxyUserToJobCount.put(proxyUserKey, proxyUserToJobCount.get(proxyUserKey) - 1);
+ }
+ }
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ if (serializedRequesters != null) {
+ try {
+ for (ServiceRequester requester : RequesterService.deserialize(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester.getName(), dagNode);
+ if (requesterToJobCount.containsKey(requesterKey) && requesterToJobCount.get(requesterKey) > 0) {
+ requesterToJobCount.put(requesterKey, requesterToJobCount.get(requesterKey) - 1);
+ }
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " + serializedRequesters, e);
+ }
+ }
+ }
+
private void deleteJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
this.jobToDag.remove(dagNode);
this.dagToJobs.get(dagId).remove(dagNode);
@@ -828,7 +932,7 @@ public class DagManager extends AbstractIdleService {
}
try {
- String serializedRequesters = ConfigUtils.getString(configs, RequesterService.REQUESTER_LIST, null);
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
if (StringUtils.isNotEmpty(serializedRequesters)) {
List<ServiceRequester> requesters = RequesterService.deserialize(serializedRequesters);
for (ServiceRequester requester : requesters) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 6bfd7de..5290444 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -35,6 +35,7 @@ import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -44,6 +45,7 @@ import org.apache.gobblin.util.ConfigUtils;
public class DagManagerUtils {
static long NO_SLA = -1L;
+ static String QUOTA_KEY_SEPERATOR = ",";
static FlowId getFlowId(Dag<JobExecutionPlan> dag) {
return getFlowId(dag.getStartNodes().get(0));
@@ -220,6 +222,18 @@ public class DagManagerUtils {
return FailureOption.valueOf(failureOption);
}
+ static String getSpecExecutorUri(DagNode<JobExecutionPlan> dagNode) {
+ return dagNode.getValue().getSpecExecutor().getUri().toString();
+ }
+
+ static String getSerializedRequesterList(DagNode<JobExecutionPlan> dagNode) {
+ return ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), RequesterService.REQUESTER_LIST, null);
+ }
+
+ static String getUserQuotaKey(String user, DagNode<JobExecutionPlan> dagNode) {
+ return user + QUOTA_KEY_SEPERATOR + getSpecExecutorUri(dagNode);
+ }
+
/**
* Increment the value of {@link JobExecutionPlan#currentAttempts}
*/
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
index c74314c..60f25d1 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagerTest.java
@@ -78,7 +78,7 @@ public class DagManagerTest {
this.queue = new LinkedBlockingQueue<>();
this.cancelQueue = new LinkedBlockingQueue<>();
this._dagManagerThread = new DagManager.DagManagerThread(_jobStatusRetriever, _dagStateStore, queue, cancelQueue,
- true);
+ true, 5, new HashMap<>());
Field jobToDagField = DagManager.DagManagerThread.class.getDeclaredField("jobToDag");
jobToDagField.setAccessible(true);