You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/10/03 20:01:05 UTC
[gobblin] branch master updated: [GOBBLIN-1703] avoid double quota increase for adhoc flows (#3550)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 0c3f6d1fc [GOBBLIN-1703] avoid double quota increase for adhoc flows (#3550)
0c3f6d1fc is described below
commit 0c3f6d1fc5dbe18c9ffecad9db459cf48465812f
Author: Arjun Singh Bora <ab...@linkedin.com>
AuthorDate: Mon Oct 3 15:00:56 2022 -0500
[GOBBLIN-1703] avoid double quota increase for adhoc flows (#3550)
* avoid double quota increase for adhoc flows
* corrected some config names
* address review comments, made runningDagIds map implementation specific
* address review comments
* address review comments
* fix checkstyle
* make checking quota for multiple dag nodes atomic
* fix unit test
* remove unused code
* remove unused imports
* address review comments
---
.../apache/gobblin/service/ServiceConfigKeys.java | 11 +-
.../runtime/spec_store/MysqlBaseSpecStore.java | 3 +-
.../modules/flow/BaseFlowToJobSpecCompiler.java | 20 ++
.../service/modules/flow/MultiHopFlowCompiler.java | 6 +
.../orchestration/AbstractUserQuotaManager.java | 158 +-----------
.../service/modules/orchestration/DagManager.java | 7 +-
.../orchestration/InMemoryUserQuotaManager.java | 162 +++++++++++-
.../orchestration/MysqlUserQuotaManager.java | 283 ++++++++++++++++++---
.../modules/orchestration/UserQuotaManager.java | 2 +-
.../scheduler/GobblinServiceJobScheduler.java | 7 +-
.../InMemoryUserQuotaManagerTest.java | 23 +-
.../orchestration/MysqlUserQuotaManagerTest.java | 78 ++++--
12 files changed, 529 insertions(+), 231 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index 55b8282fe..d9c251e69 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -32,7 +32,9 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";
- public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled";
+
+ public static final String GOBBLIN_SERVICE_ADHOC_FLOW = GOBBLIN_SERVICE_PREFIX + "adhoc.flow";
+
public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled";
@@ -146,6 +148,13 @@ public class ServiceConfigKeys {
public static final String QUOTA_MANAGER_CLASS = GOBBLIN_SERVICE_PREFIX + "quotaManager.class";
public static final String DEFAULT_QUOTA_MANAGER = "org.apache.gobblin.service.modules.orchestration.InMemoryUserQuotaManager";
+ public static final String QUOTA_STORE_DB_TABLE_KEY = "quota.store.db.table";
+ public static final String DEFAULT_QUOTA_STORE_DB_TABLE = "quota_table";
+
+ public static final String RUNNING_DAG_IDS_DB_TABLE_KEY = "running.dag.ids.store.db.table";
+ public static final String DEFAULT_RUNNING_DAG_IDS_DB_TABLE = "running_dag_ids";
+
+
// Group Membership authentication service
public static final String GROUP_OWNERSHIP_SERVICE_CLASS = GOBBLIN_SERVICE_PREFIX + "groupOwnershipService.class";
public static final String DEFAULT_GROUP_OWNERSHIP_SERVICE = "org.apache.gobblin.service.NoopGroupOwnershipService";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index b2369588f..3e3a78201 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -36,7 +36,6 @@ import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.typesafe.config.Config;
-import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -133,7 +132,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
}
- protected final DataSource dataSource;
+ protected final BasicDataSource dataSource;
protected final String tableName;
private final URI specStoreURI;
protected final SpecSerDe specSerDe;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index f910e9e4e..976d12650 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -24,7 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
+import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -90,6 +92,11 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
@Setter
protected boolean active;
+ private boolean warmStandbyEnabled;
+
+ @Inject
+ UserQuotaManager userQuotaManager;
+
public BaseFlowToJobSpecCompiler(Config config){
this(config,true);
}
@@ -119,6 +126,8 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
this.dataAuthorizationTimer = Optional.absent();
}
+ this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
+
this.topologySpecMap = Maps.newConcurrentMap();
this.config = config;
@@ -181,6 +190,17 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.compileFlow(flowSpec);
+
+ if (this.warmStandbyEnabled &&
+ (!flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+ try {
+ userQuotaManager.checkQuota(dag.getStartNodes());
+ flowSpec.getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
// If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
index 3e6eb13dc..dc3d0bc47 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MultiHopFlowCompiler.java
@@ -281,6 +281,12 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
Instrumented.markMeter(flowCompilationSuccessFulMeter);
Instrumented.updateTimer(flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+ if (Boolean.parseBoolean(flowSpec.getConfigAsProperties().getProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW))) {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : jobExecutionPlanDag.getStartNodes()) {
+ dagNode.getValue().getJobSpec().getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
+ }
+ }
+
return jobExecutionPlanDag;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
index fad079bf0..b92d6d610 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/AbstractUserQuotaManager.java
@@ -18,10 +18,7 @@
package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
-import java.util.List;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.Config;
@@ -30,10 +27,6 @@ import lombok.AllArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.exception.QuotaExceededException;
-import org.apache.gobblin.service.modules.flowgraph.Dag;
-import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
@@ -49,8 +42,7 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
public static final Integer DEFAULT_USER_JOB_QUOTA = Integer.MAX_VALUE;
private final Map<String, Integer> perUserQuota;
private final Map<String, Integer> perFlowGroupQuota;
- // TODO : we might want to make this field implementation specific to be able to decide if the dag is already running or have been accepted
- Set<String> runningDagIds = ConcurrentHashMap.newKeySet();
+
private final int defaultQuota;
public AbstractUserQuotaManager(Config config) {
@@ -69,155 +61,13 @@ abstract public class AbstractUserQuotaManager implements UserQuotaManager {
this.perFlowGroupQuota = flowGroupMapBuilder.build();
}
- // Implementations should return the current count and increase them by one
- abstract int incrementJobCount(String key, CountType countType) throws IOException;
-
- abstract void decrementJobCount(String key, CountType countType) throws IOException;
-
- public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
- QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
-
- // Throw errors for reach quota at the end to avoid inconsistent job counts
- if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
- // roll back the increased counts in this block
- rollbackIncrements(dagNode);
- throw new QuotaExceededException(quotaCheck.requesterMessage);
- }
- }
-
- private void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
- String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
- String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
- List<String> usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));
-
- decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
- decrementQuotaUsageForUsers(usersQuotaIncrement);
- decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
- runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
- }
-
- protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
- QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
- // Dag is already being tracked, no need to double increment for retries and multihop flows
- if (this.runningDagIds.contains(DagManagerUtils.generateDagId(dagNode).toString())) {
- return quotaCheck;
- } else {
- runningDagIds.add(DagManagerUtils.generateDagId(dagNode).toString());
- }
-
- String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
- String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
- ConfigurationKeys.FLOW_GROUP_KEY, "");
- String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
- StringBuilder requesterMessage = new StringBuilder();
-
- boolean proxyUserCheck;
-
- if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
- int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
- proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
- quotaCheck.setProxyUserCheck(proxyUserCheck);
- if (!proxyUserCheck) {
- // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
- requesterMessage.append(String.format(
- "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
- proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
- }
- }
-
- String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
- boolean requesterCheck = true;
-
- if (dagNode.getValue().getCurrentAttempts() <= 1) {
- List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
- for (String requester : uniqueRequesters) {
- int userQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT);
- boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
- requesterCheck = requesterCheck && thisRequesterCheck;
- quotaCheck.setRequesterCheck(requesterCheck);
- if (!thisRequesterCheck) {
- requesterMessage.append(String.format(
- "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
- requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
- }
- }
- }
-
- boolean flowGroupCheck;
-
- if (dagNode.getValue().getCurrentAttempts() <= 1) {
- int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
- DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
- flowGroupCheck = flowGroupQuotaIncrement >= 0;
- quotaCheck.setFlowGroupCheck(flowGroupCheck);
- if (!flowGroupCheck) {
- requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
- flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
- Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
- }
- }
-
- quotaCheck.setRequesterMessage(requesterMessage.toString());
-
- return quotaCheck;
- }
-
- /**
- * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
- * Returns true if the dag existed in the set of running dags and was removed successfully
- */
- public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
- boolean val = runningDagIds.remove(DagManagerUtils.generateDagId(dagNode).toString());
- if (!val) {
- return false;
- }
-
- String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
- if (proxyUser != null) {
- String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
- decrementJobCount(proxyUserKey, CountType.USER_COUNT);
- }
-
- String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
- ConfigurationKeys.FLOW_GROUP_KEY, "");
- decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
-
- String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
- try {
- for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
- String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
- decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
- }
- } catch (IOException e) {
- log.error("Failed to release quota for requester list " + serializedRequesters, e);
- return false;
- }
-
- return true;
- }
-
- private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException {
- int currentCount = incrementJobCount(key, countType);
- if (currentCount >= keyQuota) {
- return -currentCount;
- } else {
- return currentCount;
- }
- }
-
- private void decrementQuotaUsageForUsers(List<String> requestersToDecreaseCount) throws IOException {
- for (String requester : requestersToDecreaseCount) {
- decrementJobCount(requester, CountType.REQUESTER_COUNT);
- }
- }
+ abstract boolean containsDagId(String dagId) throws IOException;
- private int getQuotaForUser(String user) {
+ int getQuotaForUser(String user) {
return this.perUserQuota.getOrDefault(user, defaultQuota);
}
- private int getQuotaForFlowGroup(String flowGroup) {
+ int getQuotaForFlowGroup(String flowGroup) {
return this.perFlowGroupQuota.getOrDefault(flowGroup, defaultQuota);
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
index fcb2ff4c1..321f93b9a 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManager.java
@@ -73,6 +73,7 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -963,7 +964,11 @@ public class DagManager extends AbstractIdleService {
// Run this spec on selected executor
SpecProducer<Spec> producer;
try {
- quotaManager.checkQuota(dagNode);
+ if (!Boolean.parseBoolean(dagNode.getValue().getJobSpec().getConfigAsProperties().getProperty(
+ ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "false"))) {
+ quotaManager.checkQuota(Collections.singleton(dagNode));
+ }
+
producer = DagManagerUtils.getSpecProducer(dagNode);
TimingEvent jobOrchestrationTimer = this.eventSubmitter.isPresent() ? this.eventSubmitter.get().
getTimingEvent(TimingEvent.LauncherTimings.JOB_ORCHESTRATED) : null;
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
index 438e86d20..f4c9ab023 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManager.java
@@ -21,13 +21,18 @@ import com.typesafe.config.Config;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.util.ConfigUtils;
import static org.apache.gobblin.service.ExecutionStatus.RUNNING;
@@ -42,9 +47,153 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
private final Map<String, Integer> flowGroupToJobCount = new ConcurrentHashMap<>();
private final Map<String, Integer> requesterToJobCount = new ConcurrentHashMap<>();
+ private final Set<String> runningDagIds;
+
@Inject
public InMemoryUserQuotaManager(Config config) {
super(config);
+ this.runningDagIds = ConcurrentHashMap.newKeySet();;
+ }
+
+ protected QuotaCheck increaseAndCheckQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ // Dag is already being tracked, no need to double increment for retries and multihop flows
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+ StringBuilder requesterMessage = new StringBuilder();
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getUserQuotaKey(requester, dagNode), getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format(
+ "Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+ return quotaCheck;
+ }
+
+ protected void rollbackIncrements(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
+ List<String> usersQuotaIncrement = DagManagerUtils.getDistinctUniqueRequesters(DagManagerUtils.getSerializedRequesterList(dagNode));
+
+ decrementJobCount(DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), CountType.USER_COUNT);
+ decrementQuotaUsageForUsers(usersQuotaIncrement);
+ decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
+ removeDagId(DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ private int incrementJobCountAndCheckQuota(String key, int keyQuota, CountType countType) throws IOException {
+ int currentCount = incrementJobCount(key, countType);
+ if (currentCount >= keyQuota) {
+ return -currentCount;
+ } else {
+ return currentCount;
+ }
+ }
+
+ private void decrementQuotaUsageForUsers(List<String> requestersToDecreaseCount) throws IOException {
+ for (String requester : requestersToDecreaseCount) {
+ decrementJobCount(requester, CountType.REQUESTER_COUNT);
+ }
+ }
+
+ /**
+ * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+ * Returns true if the dag existed in the set of running dags and was removed successfully
+ */
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+ boolean val = removeDagId(DagManagerUtils.generateDagId(dagNode).toString());
+ if (!val) {
+ return false;
+ }
+
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+ decrementJobCount(proxyUserKey, CountType.USER_COUNT);
+ }
+
+ String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ decrementJobCount(DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ try {
+ for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
+ decrementJobCount(requesterKey, CountType.REQUESTER_COUNT);
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " + serializedRequesters, e);
+ return false;
+ }
+
+ return true;
+ }
+
+ void addDagId(String dagId) {
+ this.runningDagIds.add(dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(String dagId) {
+ return this.runningDagIds.remove(dagId);
}
public void init(Collection<Dag<JobExecutionPlan>> dags) throws IOException {
@@ -58,6 +207,17 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
}
}
+ public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException {
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+ QuotaCheck quotaCheck = increaseAndCheckQuota(dagNode);
+ if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
+ // roll back the increased counts in this block
+ rollbackIncrements(dagNode);
+ throw new QuotaExceededException(quotaCheck.requesterMessage);
+ }
+ }
+ }
+
private int incrementJobCount(String key, Map<String, Integer> quotaMap) {
Integer currentCount;
// Modifications must be thread safe since DAGs on DagManagerThreads may update the quota for the same user
@@ -86,7 +246,6 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
}
}
- @Override
int incrementJobCount(String user, CountType countType) throws IOException {
switch (countType) {
case USER_COUNT:
@@ -100,7 +259,6 @@ public class InMemoryUserQuotaManager extends AbstractUserQuotaManager {
}
}
- @Override
void decrementJobCount(String user, CountType countType) throws IOException {
switch (countType) {
case USER_COUNT:
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
index 78f15cfba..1cf49c83f 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManager.java
@@ -24,6 +24,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
+import java.util.List;
import org.apache.commons.dbcp.BasicDataSource;
import com.google.common.annotations.VisibleForTesting;
@@ -35,7 +36,9 @@ import javax.sql.DataSource;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.metastore.MysqlStateStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
@@ -47,12 +50,28 @@ import org.apache.gobblin.util.ConfigUtils;
@Slf4j
@Singleton
public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
- private final MysqlQuotaStore mysqlStore;
+ public final MysqlQuotaStore quotaStore;
+ public final RunningDagIdsStore runningDagIds;
+
@Inject
public MysqlUserQuotaManager(Config config) throws IOException {
super(config);
- this.mysqlStore = createQuotaStore(config);
+ this.quotaStore = createQuotaStore(config);
+ this.runningDagIds = createRunningDagStore(config);
+ }
+
+ void addDagId(Connection connection, String dagId) throws IOException {
+ this.runningDagIds.add(connection, dagId);
+ }
+
+ @Override
+ boolean containsDagId(String dagId) throws IOException {
+ return this.runningDagIds.contains(dagId);
+ }
+
+ boolean removeDagId(Connection connection, String dagId) throws IOException {
+ return this.runningDagIds.remove(connection, dagId);
}
// This implementation does not need to update quota usage when the service restarts or it's leadership status changes
@@ -60,42 +79,191 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
}
@Override
- int incrementJobCount(String user, CountType countType) throws IOException {
- try {
- return this.mysqlStore.increaseCount(user, countType);
+ public void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNodes) throws IOException {
+ try (Connection connection = this.quotaStore.dataSource.getConnection()) {
+ connection.setAutoCommit(false);
+
+ for (Dag.DagNode<JobExecutionPlan> dagNode : dagNodes) {
+ QuotaCheck quotaCheck = increaseAndCheckQuota(connection, dagNode);
+ if ((!quotaCheck.proxyUserCheck || !quotaCheck.requesterCheck || !quotaCheck.flowGroupCheck)) {
+ connection.rollback();
+ throw new QuotaExceededException(quotaCheck.requesterMessage);
+ }
+ }
+ connection.commit();
} catch (SQLException e) {
throw new IOException(e);
}
}
- @Override
- void decrementJobCount(String user, CountType countType) throws IOException {
+ int incrementJobCount(Connection connection, String user, CountType countType) throws IOException, SQLException {
+ return this.quotaStore.increaseCount(connection, user, countType);
+ }
+
+ void decrementJobCount(Connection connection,String user, CountType countType) throws IOException, SQLException {
+ this.quotaStore.decreaseCount(connection, user, countType);
+ }
+
+ protected QuotaCheck increaseAndCheckQuota(Connection connection, Dag.DagNode<JobExecutionPlan> dagNode)
+ throws SQLException, IOException {
+ QuotaCheck quotaCheck = new QuotaCheck(true, true, true, "");
+ StringBuilder requesterMessage = new StringBuilder();
+
+ // Dag is already being tracked, no need to double increment for retries and multihop flows
+ if (containsDagId(DagManagerUtils.generateDagId(dagNode).toString())) {
+ return quotaCheck;
+ } else {
+ addDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+ }
+
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ String flowGroup = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(),
+ ConfigurationKeys.FLOW_GROUP_KEY, "");
+ String specExecutorUri = DagManagerUtils.getSpecExecutorUri(dagNode);
+
+ boolean proxyUserCheck;
+
+ if (proxyUser != null && dagNode.getValue().getCurrentAttempts() <= 1) {
+ int proxyQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getUserQuotaKey(proxyUser, dagNode), getQuotaForUser(proxyUser), CountType.USER_COUNT);
+ proxyUserCheck = proxyQuotaIncrement >= 0; // proxy user quota check succeeds
+ quotaCheck.setProxyUserCheck(proxyUserCheck);
+ if (!proxyUserCheck) {
+ // add 1 to proxyUserIncrement since proxyQuotaIncrement is the count before the increment
+ requesterMessage.append(String.format(
+ "Quota exceeded for proxy user %s on executor %s : quota=%s, requests above quota=%d%n",
+ proxyUser, specExecutorUri, getQuotaForUser(proxyUser), Math.abs(proxyQuotaIncrement) + 1 - getQuotaForUser(proxyUser)));
+ }
+ }
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ boolean requesterCheck = true;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
+ for (String requester : uniqueRequesters) {
+ int userQuotaIncrement = incrementJobCountAndCheckQuota(connection, DagManagerUtils.getUserQuotaKey(requester, dagNode),
+ getQuotaForUser(requester), CountType.REQUESTER_COUNT);
+ boolean thisRequesterCheck = userQuotaIncrement >= 0; // user quota check succeeds
+ requesterCheck = requesterCheck && thisRequesterCheck;
+ quotaCheck.setRequesterCheck(requesterCheck);
+ if (!thisRequesterCheck) {
+ requesterMessage.append(String.format("Quota exceeded for requester %s on executor %s : quota=%s, requests above quota=%d%n. ",
+ requester, specExecutorUri, getQuotaForUser(requester), Math.abs(userQuotaIncrement) + 1 - getQuotaForUser(requester)));
+ }
+ }
+ }
+
+ boolean flowGroupCheck;
+
+ if (dagNode.getValue().getCurrentAttempts() <= 1) {
+ int flowGroupQuotaIncrement = incrementJobCountAndCheckQuota(connection,
+ DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), getQuotaForFlowGroup(flowGroup), CountType.FLOWGROUP_COUNT);
+ flowGroupCheck = flowGroupQuotaIncrement >= 0;
+ quotaCheck.setFlowGroupCheck(flowGroupCheck);
+ if (!flowGroupCheck) {
+ requesterMessage.append(String.format("Quota exceeded for flowgroup %s on executor %s : quota=%s, requests above quota=%d%n",
+ flowGroup, specExecutorUri, getQuotaForFlowGroup(flowGroup),
+ Math.abs(flowGroupQuotaIncrement) + 1 - getQuotaForFlowGroup(flowGroup)));
+ }
+ }
+
+ quotaCheck.setRequesterMessage(requesterMessage.toString());
+
+ return quotaCheck;
+ }
+
+ protected int incrementJobCountAndCheckQuota(Connection connection, String key, int keyQuota, CountType countType)
+ throws IOException, SQLException {
+ int currentCount = incrementJobCount(connection, key, countType);
+ if (currentCount >= keyQuota) {
+ return -currentCount;
+ } else {
+ return currentCount;
+ }
+ }
+
+ /**
+ * Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
+ * Returns true if the dag existed in the set of running dags and was removed successfully
+ */
+ public boolean releaseQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException {
+ Connection connection;
try {
- this.mysqlStore.decreaseCount(user, countType);
+ connection = this.quotaStore.dataSource.getConnection();
+ connection.setAutoCommit(false);
} catch (SQLException e) {
throw new IOException(e);
}
+
+ try {
+ boolean val = removeDagId(connection, DagManagerUtils.generateDagId(dagNode).toString());
+ if (!val) {
+ return false;
+ }
+
+ String proxyUser = ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), AzkabanProjectConfig.USER_TO_PROXY, null);
+ if (proxyUser != null) {
+ String proxyUserKey = DagManagerUtils.getUserQuotaKey(proxyUser, dagNode);
+ decrementJobCount(connection, proxyUserKey, CountType.USER_COUNT);
+ }
+
+ String flowGroup =
+ ConfigUtils.getString(dagNode.getValue().getJobSpec().getConfig(), ConfigurationKeys.FLOW_GROUP_KEY, "");
+ decrementJobCount(connection, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode), CountType.FLOWGROUP_COUNT);
+
+ String serializedRequesters = DagManagerUtils.getSerializedRequesterList(dagNode);
+ try {
+ for (String requester : DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters)) {
+ String requesterKey = DagManagerUtils.getUserQuotaKey(requester, dagNode);
+ decrementJobCount(connection, requesterKey, CountType.REQUESTER_COUNT);
+ }
+ } catch (IOException e) {
+ log.error("Failed to release quota for requester list " + serializedRequesters, e);
+ return false;
+ }
+ connection.commit();
+ } catch (SQLException ex) {
+ throw new IOException(ex);
+ } finally {
+ try {
+ connection.close();
+ } catch (SQLException ex) {
+ throw new IOException(ex);
+ }
+ }
+
+ return true;
}
@VisibleForTesting
int getCount(String name, CountType countType) throws IOException {
- return this.mysqlStore.getCount(name, countType);
+ return this.quotaStore.getCount(name, countType);
}
/**
- * Creating an instance of StateStore.
+ * Creating an instance of MysqlQuotaStore.
*/
protected MysqlQuotaStore createQuotaStore(Config config) throws IOException {
- String stateStoreTableName = ConfigUtils.getString(config, ConfigurationKeys.STATE_STORE_DB_TABLE_KEY,
- ConfigurationKeys.DEFAULT_STATE_STORE_DB_TABLE);
+ String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.QUOTA_STORE_DB_TABLE_KEY,
+ ServiceConfigKeys.DEFAULT_QUOTA_STORE_DB_TABLE);
+
+ BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
+
+ return new MysqlQuotaStore(basicDataSource, quotaStoreTableName);
+ }
+
+ protected RunningDagIdsStore createRunningDagStore(Config config) throws IOException {
+ String quotaStoreTableName = ConfigUtils.getString(config, ServiceConfigKeys.RUNNING_DAG_IDS_DB_TABLE_KEY,
+ ServiceConfigKeys.DEFAULT_RUNNING_DAG_IDS_DB_TABLE);
BasicDataSource basicDataSource = MysqlStateStore.newDataSource(config);
- return new MysqlQuotaStore(basicDataSource, stateStoreTableName);
+ return new RunningDagIdsStore(basicDataSource, quotaStoreTableName);
}
static class MysqlQuotaStore {
- protected final DataSource dataSource;
+ protected final BasicDataSource dataSource;
final String tableName;
private final String GET_USER_COUNT;
private final String GET_REQUESTER_COUNT;
@@ -133,7 +301,9 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(createQuotaTable)) {
createStatement.executeUpdate();
} catch (SQLException e) {
- throw new IOException("Failure creation table " + tableName, e);
+ log.warn("Failure in creating table {}. Validation query is set to {} Exception is {}",
+ tableName, this.dataSource.getValidationQuery(), e);
+ throw new IOException(e);
}
}
@@ -158,13 +328,11 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
}
}
- public int increaseCount(String name, CountType countType) throws IOException, SQLException {
- Connection connection = dataSource.getConnection();
- connection.setAutoCommit(false);
-
+ public int increaseCount(Connection connection, String name, CountType countType) throws IOException, SQLException {
String selectStatement;
String increaseStatement;
+
switch(countType) {
case USER_COUNT:
selectStatement = GET_USER_COUNT;
@@ -189,27 +357,19 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
statement2.setString(1, name);
rs = statement1.executeQuery();
statement2.executeUpdate();
- connection.commit();
if (rs != null && rs.next()) {
return rs.getInt(1);
} else {
return 0;
}
- } catch (SQLException e) {
- connection.rollback();
- throw new IOException("Failure increasing count for user/flowGroup " + name, e);
} finally {
if (rs != null) {
rs.close();
}
- connection.close();
}
}
- public void decreaseCount(String name, CountType countType) throws IOException, SQLException {
- Connection connection = dataSource.getConnection();
- connection.setAutoCommit(false);
-
+ public void decreaseCount(Connection connection, String name, CountType countType) throws IOException, SQLException {
String selectStatement;
String decreaseStatement;
@@ -241,18 +401,75 @@ public class MysqlUserQuotaManager extends AbstractUserQuotaManager {
rs = statement1.executeQuery();
statement2.executeUpdate();
statement3.executeUpdate();
- connection.commit();
if (rs != null && rs.next() && rs.getInt(1) == 0) {
log.warn("Decrement job count was called for " + name + " when the count was already zero/absent.");
}
- } catch (SQLException e) {
- connection.rollback();
- throw new IOException("Failure decreasing count from user/flowGroup " + name, e);
} finally {
if (rs != null) {
rs.close();
}
- connection.close();
+ }
+ }
+ }
+
+ static class RunningDagIdsStore {
+ protected final DataSource dataSource;
+ final String tableName;
+ private final String CONTAINS_DAG_ID;
+ private final String ADD_DAG_ID;
+ private final String REMOVE_DAG_ID;
+
+ public RunningDagIdsStore(BasicDataSource dataSource, String tableName)
+ throws IOException {
+ this.dataSource = dataSource;
+ this.tableName = tableName;
+
+ CONTAINS_DAG_ID = "SELECT EXISTS(SELECT * FROM " + tableName + " WHERE dagId = ?)" ;
+ ADD_DAG_ID = "INSERT INTO " + tableName + " (dagId) VALUES (?) ";
+ REMOVE_DAG_ID = "DELETE FROM " + tableName + " WHERE dagId = ?";
+
+ String createQuotaTable = "CREATE TABLE IF NOT EXISTS " + tableName + " (dagId VARCHAR(500) CHARACTER SET latin1 NOT NULL, "
+ + "PRIMARY KEY (dagId), UNIQUE INDEX ind (dagId))";
+ try (Connection connection = dataSource.getConnection(); PreparedStatement createStatement = connection.prepareStatement(createQuotaTable)) {
+ createStatement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException("Failure creation table " + tableName, e);
+ }
+ }
+
+ /**
+ * returns true if the DagID is already present in the running dag store
+ */
+ @VisibleForTesting
+ boolean contains(String dagId) throws IOException {
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement queryStatement = connection.prepareStatement(CONTAINS_DAG_ID)) {
+ queryStatement.setString(1, dagId);
+ try (ResultSet rs = queryStatement.executeQuery()) {
+ rs.next();
+ return rs.getBoolean(1);
+ }
+ } catch (Exception e) {
+ throw new IOException("Could not find if the dag " + dagId + " is already running.", e);
+ }
+ }
+
+ public void add(Connection connection, String dagId) throws IOException {
+ try (PreparedStatement statement = connection.prepareStatement(ADD_DAG_ID)) {
+ statement.setString(1, dagId);
+ statement.executeUpdate();
+ } catch (SQLException e) {
+ throw new IOException("Failure adding dag " + dagId, e);
+ }
+ }
+
+ public boolean remove(Connection connection, String dagId) throws IOException {
+ try (PreparedStatement statement = connection.prepareStatement(REMOVE_DAG_ID)) {
+ statement.setString(1, dagId);
+ int count = statement.executeUpdate();
+ return count == 1;
+ } catch (SQLException e) {
+ throw new IOException("Could not remove dag " + dagId, e);
}
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index 752956d11..dff5e0a79 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -42,7 +42,7 @@ public interface UserQuotaManager {
* It also increases the quota usage for proxy user, requester and the flowGroup of the given DagNode by one.
* @throws QuotaExceededException if the quota is exceeded
*/
- void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode) throws IOException;
+ void checkQuota(Collection<Dag.DagNode<JobExecutionPlan>> dagNode) throws IOException;
/**
* Decrement the quota by one for the proxy user and requesters corresponding to the provided {@link Dag.DagNode}.
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 863144a6f..c997f0603 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -250,9 +250,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
Properties properties = spec.getConfigAsProperties();
properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
Config config = ConfigFactory.parseProperties(properties);
- FlowSpec flowSpec = new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
+ return new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
spec.getTemplateURIs(), spec.getChildSpecs());
- return flowSpec;
}
@Override
@@ -332,10 +331,12 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
// Check quota limits against run immediately flows or adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.warmStandbyEnabled && (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"))) {
+ // This block should be reachable only for the first execution for the adhoc flows (flows that either do not have a schedule or have runImmediately=true.
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
- quotaManager.get().checkQuota(dag.getNodes().get(0));
+ quotaManager.get().checkQuota(dag.getStartNodes());
+ ((FlowSpec) addedSpec).getConfigAsProperties().setProperty(ServiceConfigKeys.GOBBLIN_SERVICE_ADHOC_FLOW, "true");
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
index 5e63a0f00..2a0285aa9 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/InMemoryUserQuotaManagerTest.java
@@ -20,6 +20,7 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -61,9 +62,9 @@ public class InMemoryUserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0)));
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dags.get(1).getNodes().get(0)));
});
}
@@ -76,7 +77,7 @@ public class InMemoryUserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0)));
Assert.assertTrue(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
Assert.assertFalse(this._quotaManager.releaseQuota(dags.get(0).getNodes().get(0)));
}
@@ -91,9 +92,9 @@ public class InMemoryUserQuotaManagerTest {
dags.get(0).getNodes().get(0).getValue().setCurrentAttempts(1);
dags.get(1).getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dags.get(0).getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dags.get(0).getNodes().get(0)));
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dags.get(1).getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dags.get(1).getNodes().get(0)));
});
}
@@ -115,20 +116,20 @@ public class InMemoryUserQuotaManagerTest {
dag3.getNodes().get(0).getValue().setCurrentAttempts(1);
dag4.getNodes().get(0).getValue().setCurrentAttempts(1);
- this._quotaManager.checkQuota(dag1.getNodes().get(0));
- this._quotaManager.checkQuota(dag2.getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dag1.getNodes().get(0)));
+ this._quotaManager.checkQuota(Collections.singleton(dag2.getNodes().get(0)));
// Should fail due to user quota
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dag3.getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dag3.getNodes().get(0)));
});
// Should fail due to flowgroup quota
Assert.assertThrows(IOException.class, () -> {
- this._quotaManager.checkQuota(dag4.getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dag4.getNodes().get(0)));
});
// should pass due to quota being released
this._quotaManager.releaseQuota(dag2.getNodes().get(0));
- this._quotaManager.checkQuota(dag3.getNodes().get(0));
- this._quotaManager.checkQuota(dag4.getNodes().get(0));
+ this._quotaManager.checkQuota(Collections.singleton(dag3.getNodes().get(0)));
+ this._quotaManager.checkQuota(Collections.singleton(dag4.getNodes().get(0)));
}
}
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
index 4bdd45597..a8931738d 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/MysqlUserQuotaManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.service.modules.orchestration;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.SQLException;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -36,6 +38,7 @@ public class MysqlUserQuotaManagerTest {
private static final String TABLE = "quotas";
private static final String PROXY_USER = "abora";
private MysqlUserQuotaManager quotaManager;
+ public static int INCREMENTS = 1000;
@BeforeClass
public void setUp() throws Exception {
@@ -52,35 +55,63 @@ public class MysqlUserQuotaManagerTest {
}
@Test
+ public void testRunningDagStore() throws Exception {
+ String dagId = DagManagerUtils.generateDagId(DagManagerTest.buildDag("dagId", 1234L, "", 1).getNodes().get(0)).toString();
+ Connection connection = this.quotaManager.quotaStore.dataSource.getConnection();
+ Assert.assertFalse(this.quotaManager.containsDagId(dagId));
+ this.quotaManager.addDagId(connection, dagId);
+ connection.commit();
+ Assert.assertTrue(this.quotaManager.containsDagId(dagId));
+ Assert.assertTrue(this.quotaManager.removeDagId(connection, dagId));
+ connection.commit();
+ Assert.assertFalse(this.quotaManager.containsDagId(dagId));
+ Assert.assertFalse(this.quotaManager.removeDagId(connection, dagId));
+ connection.commit();
+ connection.close();
+ }
+
+ @Test
public void testIncreaseCount() throws Exception {
- int prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ Connection connection = this.quotaManager.quotaStore.dataSource.getConnection();
+ int prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ connection.commit();
Assert.assertEquals(prevCount, 0);
- prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ connection.commit();
Assert.assertEquals(prevCount, 1);
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 2);
- prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ connection.commit();
Assert.assertEquals(prevCount, 0);
- prevCount = this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ prevCount = this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ connection.commit();
Assert.assertEquals(prevCount, 1);
+ connection.close();
}
@Test(dependsOnMethods = "testIncreaseCount")
public void testDecreaseCount() throws Exception {
- this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ Connection connection = this.quotaManager.quotaStore.dataSource.getConnection();
+ this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ connection.commit();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 1);
- this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ connection.commit();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 0);
- this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ connection.commit();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 0);
- this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ connection.commit();
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), 1);
- this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT);
+ connection.commit();
// on count reduced to zero, the row should get deleted and the get call should return -1 instead of 0.
Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.FLOWGROUP_COUNT), -1);
}
@@ -95,14 +126,15 @@ public class MysqlUserQuotaManagerTest {
@Override
public void run() {
int i = 0;
- while (i++ < 1000) {
- try {
+ while (i++ < INCREMENTS) {
+ try (Connection connection = MysqlUserQuotaManagerTest.this.quotaManager.quotaStore.dataSource.getConnection();) {
if (increaseOrDecrease) {
- MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ MysqlUserQuotaManagerTest.this.quotaManager.incrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
} else {
- MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
+ MysqlUserQuotaManagerTest.this.quotaManager.decrementJobCount(connection, PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT);
}
- } catch (IOException e) {
+ connection.commit();
+ } catch (IOException | SQLException e) {
Assert.fail("Thread got an exception.", e);
}
}
@@ -111,14 +143,13 @@ public class MysqlUserQuotaManagerTest {
@Test(dependsOnMethods = "testDecreaseCount")
public void testConcurrentChanges() throws IOException, InterruptedException {
- Runnable increaseCountRunnable = new ChangeCountRunnable(true);
- Runnable decreaseCountRunnable = new ChangeCountRunnable(false);
- Thread thread1 = new Thread(increaseCountRunnable);
- Thread thread2 = new Thread(increaseCountRunnable);
- Thread thread3 = new Thread(increaseCountRunnable);
- Thread thread4 = new Thread(decreaseCountRunnable);
- Thread thread5 = new Thread(decreaseCountRunnable);
- Thread thread6 = new Thread(decreaseCountRunnable);
+ int numOfThreads = 3;
+ Thread thread1 = new Thread(new ChangeCountRunnable(true));
+ Thread thread2 = new Thread(new ChangeCountRunnable(true));
+ Thread thread3 = new Thread(new ChangeCountRunnable(true));
+ Thread thread4 = new Thread(new ChangeCountRunnable(false));
+ Thread thread5 = new Thread(new ChangeCountRunnable(false));
+ Thread thread6 = new Thread(new ChangeCountRunnable(false));
thread1.start();
thread2.start();
@@ -126,7 +157,8 @@ public class MysqlUserQuotaManagerTest {
thread1.join();
thread2.join();
thread3.join();
- Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT), 3000);
+ Assert.assertEquals(this.quotaManager.getCount(PROXY_USER, AbstractUserQuotaManager.CountType.USER_COUNT),
+ INCREMENTS * 3);
thread4.start();
thread5.start();
thread6.start();