You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@gobblin.apache.org by GitBox <gi...@apache.org> on 2020/03/12 17:34:29 UTC

[GitHub] [incubator-gobblin] arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog

arjun4084346 commented on a change in pull request #2921: compile a flow before storing it in spec catalog
URL: https://github.com/apache/incubator-gobblin/pull/2921#discussion_r391782393
 
 

 ##########
 File path: gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
 ##########
 @@ -296,27 +297,44 @@ public Spec getSpecWrapper(URI uri) {
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
-    try {
-      Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-      Preconditions.checkNotNull(spec);
+    Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
+    Preconditions.checkNotNull(spec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
+        ((FlowSpec) spec).getConfigAsProperties()));
+    if (triggerListener) {
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
+        responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
+      }
+    }
 
+    boolean compileSuccess = isCompileSuccessful(responseMap);
+
+    if (compileSuccess) {
       long startTime = System.currentTimeMillis();
-      log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-          ((FlowSpec) spec).getConfigAsProperties()));
-      specStore.addSpec(spec);
       metrics.updatePutSpecTime(startTime);
-      if (triggerListener) {
-        AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
-        for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
-          responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
-        }
+      try {
+        specStore.addSpec(spec);
+      } catch (IOException e) {
+        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
       }
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+    } else {
+      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
+
     return responseMap;
   }
 
+  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
 Review comment:
   FlowCatalog::put inserts this
   responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
   entry.getKey().getName() is 
   default String getName() {
       return getClass().getName();
     }

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services