You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/12 18:01:16 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1082] compile a
flow before storing it in spec catalog
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 5796bb4 [GOBBLIN-1082] compile a flow before storing it in spec catalog
5796bb4 is described below
commit 5796bb4c890cf4c7e48c061907232289cfb08ab9
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Mar 12 11:01:09 2020 -0700
[GOBBLIN-1082] compile a flow before storing it in spec catalog
Closes #2921 from
arjun4084346/storeSpecAfterCompile
---
.../apache/gobblin/service/ServiceConfigKeys.java | 2 +
.../service/FlowConfigV2ResourceLocalHandler.java | 11 ++++--
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 44 +++++++++++++++-------
.../scheduler/GobblinServiceJobScheduler.java | 18 +++++----
4 files changed, 50 insertions(+), 25 deletions(-)
diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index daa26c4..e589b4a 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -23,6 +23,7 @@ import org.apache.gobblin.annotation.Alpha;
public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+ public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
// Gobblin Service Manager Keys
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
@@ -49,6 +50,7 @@ public class ServiceConfigKeys {
// Flow Compiler Keys
public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
+ public static final String COMPILATION_SUCCESSFUL = "compilation.successful";
// Flow Catalog Keys
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index edbcf8b..05a31a0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -34,7 +34,6 @@ import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@Slf4j
public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
- public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
super(flowCatalog);
@@ -68,19 +67,23 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
}
Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, triggerListener);
- HttpStatus httpStatus = HttpStatus.S_201_CREATED;
+ HttpStatus httpStatus;
if (flowConfig.hasExplain() && flowConfig.isExplain()) {
//This is an Explain request. So no resource is actually created.
//Enrich original FlowConfig entity by adding the compiledFlow to the properties map.
StringMap props = flowConfig.getProperties();
- AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+ AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
props.put("gobblin.flow.compiled",
addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
flowConfig.setProperties(props);
- //Return response with 200 status code, since no resource is actually created.
httpStatus = HttpStatus.S_200_OK;
+ } else if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
+ httpStatus = HttpStatus.S_201_CREATED;
+ } else {
+ httpStatus = HttpStatus.S_400_BAD_REQUEST;
}
+
return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 1124137..6919cfc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.callbacks.CallbackResult;
@@ -296,27 +297,44 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
*/
public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
Map<String, AddSpecResponse> responseMap = new HashMap<>();
- try {
- Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
- Preconditions.checkNotNull(spec);
+ Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+ Preconditions.checkNotNull(spec);
+
+ log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+ ((FlowSpec) spec).getConfigAsProperties()));
+ if (triggerListener) {
+ AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+ for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+ responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+ }
+ }
+ boolean compileSuccess = isCompileSuccessful(responseMap);
+
+ if (compileSuccess) {
long startTime = System.currentTimeMillis();
- log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
- ((FlowSpec) spec).getConfigAsProperties()));
- specStore.addSpec(spec);
metrics.updatePutSpecTime(startTime);
- if (triggerListener) {
- AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
- for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
- responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
- }
+ try {
+ specStore.addSpec(spec);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
}
- } catch (IOException e) {
- throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+ } else {
+ responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
}
+
return responseMap;
}
+ private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+ AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
+
+ return addSpecResponse != null
+ && addSpecResponse.getValue() != null
+ && !addSpecResponse.getValue().contains("ConfigException");
+ }
+
@Override
public Map<String, AddSpecResponse> put(Spec spec) {
return put(spec, true);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 798f7a3..01f6a13 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -284,6 +284,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
}
boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
String response = null;
+
+ // always try to compile the flow to verify if it is compilable
+ Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+ if (dag != null && !dag.isEmpty()) {
+ response = dag.toString();
+ } else if (!flowSpec.getCompilationErrors().isEmpty()) {
+ response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
+ }
+
if (!isExplain) {
this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
@@ -299,13 +308,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
}
} else {
- //Return a compiled flow.
- Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
- if (dag != null && !dag.isEmpty()) {
- response = dag.toString();
- } else if (!flowSpec.getCompilationErrors().isEmpty()) {
- response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
- }
_log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN request", this.serviceName, addedSpec);
if (this.flowCatalog.isPresent()) {
@@ -313,7 +315,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), false);
}
}
- return new AddSpecResponse(response);
+ return new AddSpecResponse<>(response);
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
}