You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/03/12 18:01:16 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1082] compile a flow before storing it in spec catalog

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5796bb4  [GOBBLIN-1082] compile a flow before storing it in spec catalog
5796bb4 is described below

commit 5796bb4c890cf4c7e48c061907232289cfb08ab9
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu Mar 12 11:01:09 2020 -0700

    [GOBBLIN-1082] compile a flow before storing it in spec catalog
    
    Closes #2921 from
    arjun4084346/storeSpecAfterCompile
---
 .../apache/gobblin/service/ServiceConfigKeys.java  |  2 +
 .../service/FlowConfigV2ResourceLocalHandler.java  | 11 ++++--
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 44 +++++++++++++++-------
 .../scheduler/GobblinServiceJobScheduler.java      | 18 +++++----
 4 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
index daa26c4..e589b4a 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java
@@ -23,6 +23,7 @@ import org.apache.gobblin.annotation.Alpha;
 public class ServiceConfigKeys {
 
   public static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service.";
+  public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
 
   // Gobblin Service Manager Keys
   public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
@@ -49,6 +50,7 @@ public class ServiceConfigKeys {
 
   // Flow Compiler Keys
   public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class";
+  public static final String COMPILATION_SUCCESSFUL = "compilation.successful";
 
   // Flow Catalog Keys
   public static final String GOBBLIN_SERVICE_FLOW_CATALOG_LOCAL_COMMIT = GOBBLIN_SERVICE_PREFIX + "flowCatalog.localCommit";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
index edbcf8b..05a31a0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigV2ResourceLocalHandler.java
@@ -34,7 +34,6 @@ import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 @Slf4j
 public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHandler implements FlowConfigsResourceHandler {
-  public static final String GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS = "org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler";
 
   public FlowConfigV2ResourceLocalHandler(FlowCatalog flowCatalog) {
     super(flowCatalog);
@@ -68,19 +67,23 @@ public class FlowConfigV2ResourceLocalHandler extends FlowConfigResourceLocalHan
     }
 
     Map<String, AddSpecResponse> responseMap = this.flowCatalog.put(flowSpec, triggerListener);
-    HttpStatus httpStatus = HttpStatus.S_201_CREATED;
+    HttpStatus httpStatus;
 
     if (flowConfig.hasExplain() && flowConfig.isExplain()) {
       //This is an Explain request. So no resource is actually created.
       //Enrich original FlowConfig entity by adding the compiledFlow to the properties map.
       StringMap props = flowConfig.getProperties();
-      AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
+      AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, null);
       props.put("gobblin.flow.compiled",
           addSpecResponse != null && addSpecResponse.getValue() != null ? StringEscapeUtils.escapeJson(addSpecResponse.getValue()) : "");
       flowConfig.setProperties(props);
-      //Return response with 200 status code, since no resource is actually created.
       httpStatus = HttpStatus.S_200_OK;
+    } else if (Boolean.parseBoolean(responseMap.getOrDefault(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false")).getValue().toString())) {
+      httpStatus = HttpStatus.S_201_CREATED;
+    } else {
+      httpStatus = HttpStatus.S_400_BAD_REQUEST;
     }
+
     return new CreateKVResponse(new ComplexResourceKey<>(flowConfig.getId(), flowStatusId), flowConfig, httpStatus);
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 1124137..6919cfc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
+import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.callbacks.CallbackResult;
@@ -296,27 +297,44 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
+
+    return addSpecResponse != null
+        && addSpecResponse.getValue() != null
+        && !addSpecResponse.getValue().contains("ConfigException");
+  }
+
   @Override
   public Map<String, AddSpecResponse> put(Spec spec) {
     return put(spec, true);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 798f7a3..01f6a13 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -284,6 +284,15 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         }
         boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
         String response = null;
+
+        // always try to compile the flow to verify if it is compilable
+        Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+        if (dag != null && !dag.isEmpty()) {
+          response = dag.toString();
+        } else if (!flowSpec.getCompilationErrors().isEmpty()) {
+          response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
+        }
+
         if (!isExplain) {
           this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
 
@@ -299,13 +308,6 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
             this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
           }
         } else {
-          //Return a compiled flow.
-          Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
-          if (dag != null && !dag.isEmpty()) {
-            response = dag.toString();
-          } else if (!flowSpec.getCompilationErrors().isEmpty()) {
-            response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
-          }
           _log.info("{} Skipping adding flow spec: {}, since it is an EXPLAIN request", this.serviceName, addedSpec);
 
           if (this.flowCatalog.isPresent()) {
@@ -313,7 +315,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
             this.flowCatalog.get().remove(flowSpec.getUri(), new Properties(), false);
           }
         }
-        return new AddSpecResponse(response);
+        return new AddSpecResponse<>(response);
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
       }