You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/14 17:18:43 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1144] remove
specs from gobblin service job scheduler
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 76d5a3b [GOBBLIN-1144] remove specs from gobblin service job scheduler
76d5a3b is described below
commit 76d5a3bab26b471b722787d735f3df696beecd68
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu May 14 10:18:36 2020 -0700
[GOBBLIN-1144] remove specs from gobblin service job scheduler
Dear Gobblin maintainers,
Please accept this PR. I understand that it will
not be reviewed until I have checked off all the
steps below!
### JIRA
- [x] My PR addresses the following [Gobblin JIRA]
(https://issues.apache.org/jira/browse/GOBBLIN/)
issues and references them in the PR title. For
example, "[GOBBLIN-XXX] My Gobblin PR"
-
https://issues.apache.org/jira/browse/GOBBLIN-1144
### Description
- [x] Here are some details about my PR, including
screenshots (if applicable):
implement option 4 mentioned in the doc https://do
cs.google.com/document/d/1OsImllAZRnJIp2NWEOdlfw0X
tqY1b-ysyKEZYLHwVbQ/edit
### Tests
- [x] My PR adds the following unit tests __OR__
does not need testing for this extremely good
reason:
trivial changes
### Commits
- [x] My commits all reference JIRA issues in
their subject lines, and I have squashed multiple
commits if they address the same issue. In
addition, my commits follow the guidelines from
"[How to write a good git commit
message](http://chris.beams.io/posts/git-
commit/)":
1. Subject is separated from body by a blank line
2. Subject is limited to 50 characters
3. Subject does not end with a period
4. Subject uses the imperative mood ("add", not
"adding")
5. Body wraps at 72 characters
6. Body explains "what" and "why", not "how"
Closes #2981 from
arjun4084346/flowCatalogRaceCondition
---
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 42 +++++++++++-----------
.../scheduler/GobblinServiceJobScheduler.java | 15 ++++++--
2 files changed, 34 insertions(+), 23 deletions(-)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index d902fdf..029005c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -21,6 +21,8 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
@@ -33,6 +35,7 @@ import java.util.Map;
import java.util.Properties;
import javax.annotation.Nonnull;
import org.apache.commons.lang3.reflect.ConstructorUtils;
+
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
@@ -293,30 +296,27 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
+ FlowSpec flowSpec = (FlowSpec) spec;
Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
- Preconditions.checkNotNull(spec);
+ Preconditions.checkNotNull(flowSpec);
+
+ log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
+ try {
+ long startTime = System.currentTimeMillis();
+ specStore.addSpec(flowSpec);
+ metrics.updatePutSpecTime(startTime);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
+ }
- log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
- ((FlowSpec) spec).getConfigAsProperties()));
if (triggerListener) {
- AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+ AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
}
}
- boolean compileSuccess = isCompileSuccessful(responseMap);
-
- if (compileSuccess) {
- long startTime = System.currentTimeMillis();
- metrics.updatePutSpecTime(startTime);
- try {
- if (!((FlowSpec) spec).isExplain()) {
- specStore.addSpec(spec);
- }
- } catch (IOException e) {
- throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
- }
+ if (isCompileSuccessful(responseMap)) {
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
} else {
responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
@@ -325,12 +325,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
return responseMap;
}
- private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+ public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
- return addSpecResponse != null
- && addSpecResponse.getValue() != null
- && !addSpecResponse.getValue().contains("ConfigException");
+ return isCompileSuccessful(addSpecResponse.getValue());
+ }
+
+ public static boolean isCompileSuccessful(String dag) {
+ return dag != null && !dag.contains(ConfigException.class.getSimpleName());
}
@Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index afc401b..4f2c9ff 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -269,6 +269,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
if (addedSpec instanceof FlowSpec) {
try {
FlowSpec flowSpec = (FlowSpec) addedSpec;
+ URI flowSpecUri = flowSpec.getUri();
Properties jobConfig = new Properties();
Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties();
jobConfig.putAll(this.properties);
@@ -293,7 +294,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
}
- if (!isExplain) {
+ boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
+
+ if (!isExplain && compileSuccess) {
this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
@@ -301,11 +304,17 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
scheduleJob(jobConfig, null);
if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
_log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
- this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+ this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
}
} else {
_log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
- this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
+ this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
+ }
+ } else {
+ _log.info("Removing the flow spec: {}, isExplain: {}, compileSuccess: {}", addedSpec, isExplain, compileSuccess);
+ if (this.flowCatalog.isPresent()) {
+ _log.debug("Removing flow spec from FlowCatalog: {}", flowSpec);
+ GobblinServiceJobScheduler.this.flowCatalog.get().remove(flowSpecUri, new Properties(), false);
}
}
return new AddSpecResponse<>(response);