You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2022/07/25 20:49:05 UTC
[gobblin] branch master updated: [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (#3516)
This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 9ce1e65aa [GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (#3516)
9ce1e65aa is described below
commit 9ce1e65aa36674742877b5aa2083412d85b5764f
Author: William Lo <lo...@gmail.com>
AuthorDate: Mon Jul 25 13:49:01 2022 -0700
[GOBBLIN-1656] Return a http status 503 on GaaS when quota is exceeded for user or flowgroup (#3516)
* Add e2e tests and set http response code for quota exceeded
* cleanup
* Fix checkstyle test
* Improve guard against schedule change if quota is exceeded
* Fix bug relating to exception propagation and scheduler not checking quota due to current attempt number
* Address review comments
* Refactor based on review feedback
* Fix test
* Cleanup around handling responses from callbacks in GaaS API
* Fix checkstyle
* catch quotaexceededexception instead of checking type explicitly
* Log other errors and throw 500
* Fix checkstyle dead store
* Fix checkstyle again
---
.../service/FlowConfigResourceLocalHandler.java | 22 ++++-
.../service/FlowConfigV2ResourceLocalHandler.java | 20 ++++-
.../gobblin/runtime/api/MutableSpecCatalog.java | 2 +-
.../gobblin/runtime/api/SpecCatalogListener.java | 3 +-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 23 +++---
.../runtime/spec_catalog/FlowCatalogTest.java | 51 +++++++++---
.../service/modules/core/GitConfigMonitor.java | 2 +-
.../modules/core/GobblinServiceGuiceModule.java | 3 +
.../modules/core/GobblinServiceManager.java | 5 ++
.../modules/orchestration/DagManagerUtils.java | 17 ++++
.../modules/orchestration/UserQuotaManager.java | 21 +++--
.../scheduler/GobblinServiceJobScheduler.java | 22 ++++-
.../gobblin/service/GobblinServiceManagerTest.java | 23 +++++-
.../modules/orchestration/OrchestratorTest.java | 2 +-
.../orchestration/UserQuotaManagerTest.java | 2 -
.../scheduler/GobblinServiceJobSchedulerTest.java | 94 ++++++++++++++++++++--
.../gobblin/exception/QuotaExceededException.java | 27 +++++++
.../gobblin/util/callbacks/CallbackResult.java | 3 +-
18 files changed, 287 insertions(+), 55 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 8430acf1e..7e91c9d1a 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -42,6 +42,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
@@ -133,7 +134,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
if (!flowConfig.hasSchedule() && this.flowCatalog.exists(flowSpec.getUri())) {
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_409_CONFLICT);
} else {
- this.flowCatalog.put(flowSpec, triggerListener);
+ try {
+ this.flowCatalog.put(flowSpec, triggerListener);
+ } catch (QuotaExceededException e) {
+ throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+ } catch (Throwable e) {
+ // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+ log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED);
}
}
@@ -168,8 +177,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
originalFlowConfig.setSchedule(NEVER_RUN_CRON_SCHEDULE);
flowConfig = originalFlowConfig;
}
-
- this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
+ try {
+ this.flowCatalog.put(createFlowSpecForConfig(flowConfig), triggerListener);
+ } catch (QuotaExceededException e) {
+ throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+ } catch (Throwable e) {
+ // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+ log.warn(String.format("Failed to add flow configuration %s.%sto catalog due to", flowId.getFlowGroup(), flowId.getFlowName()), e);
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
return new UpdateResponse(HttpStatus.S_200_OK);
}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index b7249c1a2..d2bb55743 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -36,9 +36,12 @@ import javax.inject.Inject;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+
+
@Slf4j
public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsV2ResourceHandler {
@@ -60,9 +63,9 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
}
log.info(createLog);
FlowSpec flowSpec = createFlowSpecForConfig(flowConfig);
- FlowStatusId flowStatusId = new FlowStatusId()
- .setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
- .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
+ FlowStatusId flowStatusId =
+ new FlowStatusId().setFlowName(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_NAME_KEY))
+ .setFlowGroup(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_GROUP_KEY));
if (flowSpec.getConfigAsProperties().containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
flowStatusId.setFlowExecutionId(Long.valueOf(flowSpec.getConfigAsProperties().getProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)));
} else {
@@ -76,7 +79,16 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
"FlowSpec with URI " + flowSpec.getUri() + " already exists, no action will be taken"));
}
- Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, triggerListener);
+ Map<String, AddSpecResponse> responseMap;
+ try {
+ responseMap = this.flowCatalog.put(flowSpec, triggerListener);
+ } catch (QuotaExceededException e) {
+ throw new RestLiServiceException(HttpStatus.S_503_SERVICE_UNAVAILABLE, e.getMessage());
+ } catch (Throwable e) {
+ // TODO: Compilation errors should fall under throwable exceptions as well instead of checking for strings
+ log.warn(String.format("Failed to add flow configuration %s.%s to catalog due to", flowConfig.getId().getFlowGroup(), flowConfig.getId().getFlowName()), e);
+ throw new RestLiServiceException(HttpStatus.S_500_INTERNAL_SERVER_ERROR, e.getMessage());
+ }
HttpStatus httpStatus;
if (flowConfig.hasExplain() && flowConfig.isExplain()) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index 3c1573a2e..e6c81f3aa 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -48,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
* on adding a {@link Spec} to the {@link SpecCatalog}. The key for each entry is the name of the {@link SpecCatalogListener}
* and the value is the result of the the action taken by the listener returned as an instance of {@link AddSpecResponse}.
* */
- Map<String, AddSpecResponse> put(Spec spec);
+ Map<String, AddSpecResponse> put(Spec spec) throws Throwable;
/**
* Removes an existing {@link Spec} with the given URI.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
index 67f2e39ca..6bf61a1e6 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalogListener.java
@@ -48,7 +48,8 @@ public interface SpecCatalogListener {
_addedSpec = addedSpec;
}
- @Override public AddSpecResponse apply(SpecCatalogListener listener) {
+ @Override
+ public AddSpecResponse apply(SpecCatalogListener listener) {
return listener.onAddSpec(_addedSpec);
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index c9c17b027..f459c9588 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -342,7 +342,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
* @param triggerListener True if listeners should be notified.
* @return a map of listeners and their {@link AddSpecResponse}s
*/
- public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
+ public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) throws Throwable {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
FlowSpec flowSpec = (FlowSpec) spec;
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
@@ -355,13 +355,21 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
if (triggerListener) {
AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
- // If flow fails compilation, the result will have a non-empty string with the error
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getSuccesses().entrySet()) {
responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
}
+ // If flow fails compilation, the result will have a non-empty string with the error
+ if (response.getValue().getFailures().size() > 0) {
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry : response.getValue().getFailures().entrySet()) {
+ throw entry.getValue().getError().getCause();
+ }
+ return responseMap;
+ }
}
+ AddSpecResponse<String> schedulerResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
- if (isCompileSuccessful(responseMap)) {
+ // Check that the flow configuration is valid and matches to a corresponding edge
+ if (isCompileSuccessful(schedulerResponse.getValue())) {
synchronized (syncObject) {
try {
if (!flowSpec.isExplain()) {
@@ -384,19 +392,12 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
return responseMap;
}
- public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
- // If we cannot get the response from the scheduler, assume that the flow failed compilation
- AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
- ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(null));
- return isCompileSuccessful(addSpecResponse.getValue());
- }
-
public static boolean isCompileSuccessful(String dag) {
return dag != null && !dag.contains(ConfigException.class.getSimpleName());
}
@Override
- public Map<String, AddSpecResponse> put(Spec spec) {
+ public Map<String, AddSpecResponse> put(Spec spec) throws Throwable {
return put(spec, true);
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 319a0f384..f79d8501a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -21,6 +21,7 @@ import com.google.common.base.Optional;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import java.io.File;
import java.net.URI;
import java.util.Collection;
@@ -28,6 +29,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
@@ -105,19 +107,24 @@ public class FlowCatalogTest {
return initFlowSpec(specStore, uri, "flowName");
}
- /**
- * Create FLowSpec with specified URI and SpecStore location.
- */
+ /**
+ * Create FLowSpec with specified URI and SpecStore location.
+ */
public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName){
+ return initFlowSpec(specStore, uri, flowName, "", ConfigFactory.empty());
+ }
+
+ public static FlowSpec initFlowSpec(String specStore, URI uri, String flowName, String flowGroup, Config additionalConfigs) {
Properties properties = new Properties();
properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
+ properties.put(ConfigurationKeys.FLOW_GROUP_KEY, flowGroup);
properties.put("job.name", flowName);
- properties.put("job.group", flowName);
+ properties.put("job.group", flowGroup);
properties.put("specStore.fs.dir", specStore);
properties.put("specExecInstance.capabilities", "source:destination");
properties.put("job.schedule", "0 0 0 ? * * 2050");
- Config config = ConfigUtils.propertiesToConfig(properties);
-
+ Config defaults = ConfigUtils.propertiesToConfig(properties);
+ Config config = additionalConfigs.withFallback(defaults);
SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
FlowSpec.Builder flowSpecBuilder = null;
@@ -141,7 +148,7 @@ public class FlowCatalogTest {
}
@Test
- public void createFlowSpec() {
+ public void createFlowSpec() throws Throwable {
// List Current Specs
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
@@ -199,7 +206,7 @@ public class FlowCatalogTest {
}
@Test (dependsOnMethods = "deleteFlowSpec")
- public void testRejectBadFlow() {
+ public void testRejectBadFlow() throws Throwable {
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
int i=0;
@@ -223,7 +230,7 @@ public class FlowCatalogTest {
}
@Test (dependsOnMethods = "testRejectBadFlow")
- public void testRejectMissingListener() {
+ public void testRejectMissingListener() throws Throwable {
flowCatalog.removeListener(this.mockListener);
Collection<Spec> specs = flowCatalog.getSpecs();
logger.info("[Before Create] Number of specs: " + specs.size());
@@ -244,6 +251,32 @@ public class FlowCatalogTest {
Assert.assertEquals(flowCatalog.getSize(), 0);
}
+ @Test (dependsOnMethods = "testRejectMissingListener")
+ public void testRejectQuotaExceededFlow() {
+ Collection<Spec> specs = flowCatalog.getSpecs();
+ logger.info("[Before Create] Number of specs: " + specs.size());
+ int i=0;
+ for (Spec spec : specs) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ logger.info("[Before Create] Spec " + i++ + ": " + gson.toJson(flowSpec));
+ }
+ Assert.assertEquals(specs.size(), 0, "Spec store should be empty before addition");
+
+ // Create and add Spec
+ FlowSpec badSpec = initFlowSpec(SPEC_STORE_DIR, computeFlowSpecURI(), "badFlow");
+
+ // Assume that spec is rejected
+ when(this.mockListener.onAddSpec(any())).thenThrow(new RuntimeException(new QuotaExceededException("error")));
+ try {
+ Map<String, AddSpecResponse> response = this.flowCatalog.put(badSpec);
+ } catch (Throwable e) {
+ Assert.assertTrue(e instanceof QuotaExceededException);
+ }
+ // Spec should be rejected from being stored
+ specs = flowCatalog.getSpecs();
+ Assert.assertEquals(specs.size(), 0);
+ }
+
public static URI computeFlowSpecURI() {
// Make sure this is relative
URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
index 5aa0b438e..101d9f46d 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java
@@ -108,7 +108,7 @@ public class GitConfigMonitor extends GitMonitoringService {
.withVersion(SPEC_VERSION)
.withDescription(SPEC_DESCRIPTION)
.build());
- } catch (IOException e) {
+ } catch (Throwable e) {
log.warn("Could not load config file: " + configFilePath);
}
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
index 1a1bce804..efb03339c 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service.modules.core;
import java.util.Objects;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.helix.HelixManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -194,6 +195,8 @@ public class GobblinServiceGuiceModule implements Module {
binder.bind(Orchestrator.class);
binder.bind(SchedulerService.class);
binder.bind(GobblinServiceJobScheduler.class);
+ OptionalBinder.newOptionalBinder(binder, UserQuotaManager.class);
+ binder.bind(UserQuotaManager.class);
}
if (serviceConfig.isGitConfigMonitorEnabled()) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index c7fc958d0..d35eef64b 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -31,6 +31,7 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.ObjectUtils;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -197,6 +198,10 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
@Inject
protected ServiceDatabaseManager databaseManager;
+ @Inject(optional=true)
+ @Getter
+ protected Optional<UserQuotaManager> quotaManager;
+
protected Optional<HelixLeaderState> helixLeaderGauges;
@Inject(optional = true)
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index a472a37cb..9d59a9085 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -18,6 +18,7 @@ package org.apache.gobblin.service.modules.orchestration;
import com.google.common.collect.ImmutableMap;
import com.typesafe.config.ConfigFactory;
+import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -32,6 +33,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.typesafe.config.Config;
+import java.util.stream.Collectors;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
@@ -41,6 +43,7 @@ import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.FlowId;
import org.apache.gobblin.service.RequesterService;
+import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.Dag.DagNode;
import org.apache.gobblin.service.modules.orchestration.DagManager.FailureOption;
@@ -340,4 +343,18 @@ public class DagManagerUtils {
eventSubmitter.get().getTimingEvent(flowEvent).stop(flowMetadata);
}
}
+
+ static List<String> getDistinctUniqueRequesters(String serializedRequesters) {
+ List<String> uniqueRequesters;
+ try {
+ uniqueRequesters = RequesterService.deserialize(serializedRequesters)
+ .stream()
+ .map(ServiceRequester::getName)
+ .distinct()
+ .collect(Collectors.toList());
+ return uniqueRequesters;
+ } catch (IOException e) {
+ throw new RuntimeException("Could not process requesters due to ", e);
+ }
+ }
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
index c49cdfc7f..36b4d2718 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManager.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.service.modules.orchestration;
import com.google.common.collect.ImmutableMap;
+import com.google.inject.Inject;
import com.typesafe.config.Config;
import java.io.IOException;
import java.util.HashSet;
@@ -24,9 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import javax.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.service.RequesterService;
import org.apache.gobblin.service.ServiceRequester;
import org.apache.gobblin.service.modules.flowgraph.Dag;
@@ -40,6 +42,7 @@ import org.apache.gobblin.util.ConfigUtils;
* is exceeded, then the execution will fail without running on the underlying executor
*/
@Slf4j
+@Singleton
public class UserQuotaManager {
public static final String PER_USER_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perUserQuota";
public static final String PER_FLOWGROUP_QUOTA = DagManager.DAG_MANAGER_PREFIX + "perFlowGroupQuota";
@@ -54,7 +57,8 @@ public class UserQuotaManager {
Map<String, Boolean> runningDagIds = new ConcurrentHashMap<>();
private final int defaultQuota;
- UserQuotaManager(Config config) {
+ @Inject
+ public UserQuotaManager(Config config) {
this.defaultQuota = ConfigUtils.getInt(config, USER_JOB_QUOTA_KEY, DEFAULT_USER_JOB_QUOTA);
ImmutableMap.Builder<String, Integer> userMapBuilder = ImmutableMap.builder();
ImmutableMap.Builder<String, Integer> flowGroupMapBuilder = ImmutableMap.builder();
@@ -72,9 +76,9 @@ public class UserQuotaManager {
/**
* Checks if the dagNode exceeds the statically configured user quota for both the proxy user, requester user, and flowGroup
- * @throws IOException if the quota is exceeded, and logs a statement
+ * @throws QuotaExceededException if the quota is exceeded, and logs a statement
*/
- public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws IOException {
+ public void checkQuota(Dag.DagNode<JobExecutionPlan> dagNode, boolean onInit) throws QuotaExceededException {
// Dag is already being tracked, no need to double increment for retries and multihop flows
if (isDagCurrentlyRunning(dagNode)) {
return;
@@ -103,8 +107,7 @@ public class UserQuotaManager {
boolean requesterCheck = true;
if (serializedRequesters != null) {
- List<String> uniqueRequesters = RequesterService.deserialize(serializedRequesters).stream()
- .map(ServiceRequester::getName).distinct().collect(Collectors.toList());
+ List<String> uniqueRequesters = DagManagerUtils.getDistinctUniqueRequesters(serializedRequesters);
for (String requester : uniqueRequesters) {
int userQuotaIncrement = incrementJobCountAndCheckQuota(
DagManagerUtils.getUserQuotaKey(requester, dagNode), requesterToJobCount, dagNode, getQuotaForUser(requester));
@@ -135,7 +138,7 @@ public class UserQuotaManager {
decrementQuotaUsage(flowGroupToJobCount, DagManagerUtils.getFlowGroupQuotaKey(flowGroup, dagNode));
decrementQuotaUsageForUsers(usersQuotaIncrement);
runningDagIds.remove(DagManagerUtils.generateDagId(dagNode));
- throw new IOException(requesterMessage.toString());
+ throw new QuotaExceededException(requesterMessage.toString());
}
}
@@ -148,7 +151,9 @@ public class UserQuotaManager {
*/
private int incrementJobCountAndCheckQuota(String key, Map<String, Integer> quotaMap, Dag.DagNode<JobExecutionPlan> dagNode, int quotaForKey) {
// Only increment job count for first attempt, since job is considered running between retries
- if (dagNode.getValue().getCurrentAttempts() != 1) {
+ // Include the scenario where currentAttempts is 0 (when checked by the scheduler)
+ // but it will not double increment due to first ensuring that the dag is not already incremented
+ if (dagNode.getValue().getCurrentAttempts() > 1) {
return 0;
}
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index ffd2f15a4..d00365f13 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -35,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.exception.QuotaExceededException;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
@@ -54,6 +55,7 @@ import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.InjectionNames;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
@@ -91,6 +93,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
protected final Optional<FlowCatalog> flowCatalog;
protected final Optional<HelixManager> helixManager;
protected final Orchestrator orchestrator;
+ protected final Optional<UserQuotaManager> quotaManager;
@Getter
protected final Map<String, Spec> scheduledFlowSpecs;
@Getter
@@ -119,7 +122,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
@Inject
public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName, Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog,
- Orchestrator orchestrator, SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+ Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -130,14 +133,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.scheduledFlowSpecs = Maps.newHashMap();
this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
+ this.quotaManager = quotaManager;
}
public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager,
- Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
+ Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService, Optional<Logger> log) throws Exception {
this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
- new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, log);
+ new Orchestrator(config, flowStatusGenerator, topologyCatalog, dagManager, log), schedulerService, quotaManager, log);
}
public synchronized void setActive(boolean isActive) {
@@ -322,6 +326,18 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
return new AddSpecResponse<>(response);
}
+ // Check quota limits against run immediately flows or adhoc flows before saving the schedule
+ if (!jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) || PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+ if (quotaManager.isPresent()) {
+ // QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
+ try {
+ quotaManager.get().checkQuota(dag.getNodes().get(0), false);
+ } catch (QuotaExceededException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 2572fda66..13e607d19 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
+import org.apache.gobblin.service.modules.orchestration.ServiceAzkabanConfigKeys;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.hadoop.fs.Path;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jgit.api.Git;
@@ -162,7 +164,7 @@ public class GobblinServiceManagerTest {
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, MockedSpecCompiler.class.getCanonicalName());
-
+ serviceCoreProperties.put(UserQuotaManager.PER_USER_QUOTA, "testUser:1");
transportClientProperties.put(HttpClientFactory.HTTP_REQUEST_TIMEOUT, "10000");
// Create a bare repository
@@ -308,6 +310,25 @@ public class GobblinServiceManagerTest {
}
@Test (dependsOnMethods = "testRunOnceJob")
+ public void testRunQuotaExceeds() throws Exception {
+ Map<String, String> props = flowProperties;
+ props.put(ServiceAzkabanConfigKeys.AZKABAN_PROJECT_USER_TO_PROXY_KEY, "testUser");
+ FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(props));
+
+ this.flowConfigClient.createFlowConfig(flowConfig);
+
+ FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
+ .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(props));
+
+ try {
+ this.flowConfigClient.createFlowConfig(flowConfig2);
+ } catch (RestLiResponseException e) {
+ Assert.assertEquals(e.getStatus(), HttpStatus.SERVICE_UNAVAILABLE_503);
+ }
+ }
+
+ @Test (dependsOnMethods = "testRunQuotaExceeds")
public void testExplainJob() throws Exception {
FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
.setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true);
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
index 22208498d..c25c91b59 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java
@@ -226,7 +226,7 @@ public class OrchestratorTest {
}
@Test (dependsOnMethods = "createTopologySpec")
- public void createFlowSpec() throws Exception {
+ public void createFlowSpec() throws Throwable {
// Since only 1 Topology with 1 SpecProducer has been added in previous test
// .. it should be available and responsible for our new FlowSpec
IdentityFlowToJobSpecCompiler specCompiler = (IdentityFlowToJobSpecCompiler) this.orchestrator.getSpecCompiler();
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
index 98ebc6bca..9fc9438db 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/UserQuotaManagerTest.java
@@ -52,8 +52,6 @@ public class UserQuotaManagerTest {
this._quotaManager.checkQuota(dags.get(0).getNodes().get(0), true);
// Should not be throwing the exception
this._quotaManager.checkQuota(dags.get(1).getNodes().get(0), true);
-
- // TODO: add verification when adding a public method for getting the current count and quota per user
}
@Test
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a9323971b..0750392ee 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -21,26 +21,37 @@ import com.google.common.base.Predicate;
import com.google.common.io.Files;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
import org.apache.gobblin.service.modules.flow.SpecCompiler;
+import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
+import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
+import org.apache.gobblin.service.modules.spec.JobExecutionPlanDagFactory;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.apache.gobblin.util.ConfigUtils;
@@ -48,6 +59,7 @@ import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
import org.mockito.stubbing.Answer;
import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.apache.gobblin.runtime.spec_catalog.FlowCatalog.*;
@@ -61,11 +73,16 @@ public class GobblinServiceJobSchedulerTest {
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
+ private Config quotaConfig;
+ @BeforeClass
+ public void setUp() {
+ this.quotaConfig = ConfigFactory.empty().withValue(UserQuotaManager.PER_FLOWGROUP_QUOTA, ConfigValueFactory.fromAnyRef("group1:1"));
+ }
/**
* Test whenever JobScheduler is calling setActive, the FlowSpec is loading into scheduledFlowSpecs (eventually)
*/
@Test
- public void testJobSchedulerInit() throws Exception {
+ public void testJobSchedulerInit() throws Throwable {
// Mock a FlowCatalog.
File specDir = Files.createTempDir();
@@ -90,10 +107,11 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+ UserQuotaManager quotaManager = new UserQuotaManager(quotaConfig);
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(quotaManager), null);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -145,7 +163,7 @@ public class GobblinServiceJobSchedulerTest {
* Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
*/
@Test
- public void testJobSchedulerInitWithFailedSpec() throws Exception {
+ public void testJobSchedulerInitWithFailedSpec() throws Throwable {
// Mock a FlowCatalog.
File specDir = Files.createTempDir();
@@ -179,7 +197,7 @@ public class GobblinServiceJobSchedulerTest {
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, null);
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), null);
SpecCompiler mockCompiler = Mockito.mock(SpecCompiler.class);
Mockito.when(mockOrchestrator.getSpecCompiler()).thenReturn(mockCompiler);
@@ -209,7 +227,7 @@ public class GobblinServiceJobSchedulerTest {
* Test that flowSpecs that throw compilation errors do not block the scheduling of other flowSpecs
*/
@Test
- public void testJobSchedulerUnschedule() throws Exception {
+ public void testJobSchedulerUnschedule() throws Throwable {
// Mock a FlowCatalog.
File specDir = Files.createTempDir();
@@ -242,7 +260,7 @@ public class GobblinServiceJobSchedulerTest {
SchedulerService schedulerService = new SchedulerService(new Properties());
// Mock a GaaS scheduler.
TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
- ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService );
+ ConfigFactory.empty(), Optional.of(flowCatalog), null, mockOrchestrator, Optional.of(new UserQuotaManager(quotaConfig)), schedulerService );
schedulerService.startAsync().awaitRunning();
scheduler.startUp();
@@ -283,14 +301,60 @@ public class GobblinServiceJobSchedulerTest {
Assert.assertEquals(schedulerService.getScheduler().getJobGroupNames().size(), 0);
}
+ @Test
+ public void testJobSchedulerAddFlowQuotaExceeded() throws Exception {
+ File specDir = Files.createTempDir();
+
+ Properties properties = new Properties();
+ properties.setProperty(FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+ FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+ ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+
+ serviceLauncher.addService(flowCatalog);
+ serviceLauncher.start();
+
+ FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"), "flowName0", "group1",
+ ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
+ FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"), "flowName1", "group1",
+ ConfigFactory.empty().withValue(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, ConfigValueFactory.fromAnyRef("true")));
+
+ Orchestrator mockOrchestrator = Mockito.mock(Orchestrator.class);
+ SpecCompiler mockSpecCompiler = Mockito.mock(SpecCompiler.class);
+ when(mockOrchestrator.getSpecCompiler()).thenReturn(mockSpecCompiler);
+ Dag<JobExecutionPlan> mockDag0 = this.buildDag(flowSpec0.getConfig(), "0");
+ Dag<JobExecutionPlan> mockDag1 = this.buildDag(flowSpec1.getConfig(), "1");
+ when(mockSpecCompiler.compileFlow(flowSpec0)).thenReturn(mockDag0);
+ when(mockSpecCompiler.compileFlow(flowSpec1)).thenReturn(mockDag1);
+
+ SchedulerService schedulerService = new SchedulerService(new Properties());
+ // Mock a GaaS scheduler.
+ GobblinServiceJobScheduler scheduler = new GobblinServiceJobScheduler("testscheduler",
+ ConfigFactory.empty(), Optional.absent(), Optional.of(flowCatalog), null, mockOrchestrator, schedulerService, Optional.of(new UserQuotaManager(quotaConfig)), Optional.absent());
+
+ schedulerService.startAsync().awaitRunning();
+ scheduler.startUp();
+ scheduler.setActive(true);
+
+ scheduler.onAddSpec(flowSpec0); //Ignore the response for this request
+ Assert.assertThrows(RuntimeException.class, () -> scheduler.onAddSpec(flowSpec1));
+
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+ // Second flow should not be added to scheduled flows since it was rejected
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 1);
+ // set scheduler to be inactive and unschedule flows
+ scheduler.setActive(false);
+ Assert.assertEquals(scheduler.scheduledFlowSpecs.size(), 0);
+ }
+
class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
public boolean isCompilerHealthy = false;
private boolean hasScheduler = false;
public TestGobblinServiceJobScheduler(String serviceName, Config config,
- Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
+ Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator, Optional<UserQuotaManager> quotaManager,
SchedulerService schedulerService) throws Exception {
- super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, Optional.absent());
+ super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, quotaManager, Optional.absent());
if (schedulerService != null) {
hasScheduler = true;
}
@@ -318,4 +382,18 @@ public class GobblinServiceJobSchedulerTest {
return new AddSpecResponse(addedSpec.getDescription());
}
}
+
+ Dag<JobExecutionPlan> buildDag(Config additionalConfig, String id) throws URISyntaxException {
+ Config config = ConfigFactory.empty().
+ withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, ConfigValueFactory.fromAnyRef(System.currentTimeMillis()));
+
+ config = additionalConfig.withFallback(config);
+ List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
+ JobSpec js = JobSpec.builder("test_job_" + id).withVersion(id).withConfig(config).
+ withTemplate(new URI("job_" + id)).build();
+ SpecExecutor specExecutor = InMemorySpecExecutor.createDummySpecExecutor(new URI("jobExecutor"));
+ JobExecutionPlan jobExecutionPlan = new JobExecutionPlan(js, specExecutor);
+ jobExecutionPlans.add(jobExecutionPlan);
+ return new JobExecutionPlanDagFactory().createDag(jobExecutionPlans);
+ }
}
\ No newline at end of file
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java b/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java
new file mode 100644
index 000000000..6ab63d150
--- /dev/null
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/exception/QuotaExceededException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.exception;
+
+import java.io.IOException;
+
+
+public class QuotaExceededException extends IOException {
+
+ public QuotaExceededException(String message) {
+ super(message);
+ }
+}
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java
index 0132a8249..8c1f751a3 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/callbacks/CallbackResult.java
@@ -16,7 +16,6 @@
*/
package org.apache.gobblin.util.callbacks;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import com.google.common.base.Preconditions;
@@ -64,7 +63,7 @@ public class CallbackResult<R> {
R res = execFuture.get();
return createSuccessful(res);
}
- catch (ExecutionException e) {
+ catch (Exception e) {
if (execFuture.isCancelled()) {
return createCancelled();
}