You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/05/14 17:18:43 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1144] remove specs from gobblin service job scheduler

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 76d5a3b  [GOBBLIN-1144] remove specs from gobblin service job scheduler
76d5a3b is described below

commit 76d5a3bab26b471b722787d735f3df696beecd68
Author: Arjun <ab...@linkedin.com>
AuthorDate: Thu May 14 10:18:36 2020 -0700

    [GOBBLIN-1144] remove specs from gobblin service job scheduler
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
    (https://issues.apache.org/jira/browse/GOBBLIN/)
    issues and references them in the PR title. For
    example, "[GOBBLIN-XXX] My Gobblin PR"
        -
    https://issues.apache.org/jira/browse/GOBBLIN-1144
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    implement option 4 mentioned in the doc https://do
    cs.google.com/document/d/1OsImllAZRnJIp2NWEOdlfw0X
    tqY1b-ysyKEZYLHwVbQ/edit
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    trivial changes
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Closes #2981 from
    arjun4084346/flowCatalogRaceCondition
---
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  | 42 +++++++++++-----------
 .../scheduler/GobblinServiceJobScheduler.java      | 15 ++++++--
 2 files changed, 34 insertions(+), 23 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index d902fdf..029005c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -21,6 +21,8 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -33,6 +35,7 @@ import java.util.Map;
 import java.util.Properties;
 import javax.annotation.Nonnull;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.Tag;
@@ -293,30 +296,27 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
    */
   public Map<String, AddSpecResponse> put(Spec spec, boolean triggerListener) {
     Map<String, AddSpecResponse> responseMap = new HashMap<>();
+    FlowSpec flowSpec = (FlowSpec) spec;
     Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName()));
-    Preconditions.checkNotNull(spec);
+    Preconditions.checkNotNull(flowSpec);
+
+    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
+    try {
+      long startTime = System.currentTimeMillis();
+      specStore.addSpec(flowSpec);
+      metrics.updatePutSpecTime(startTime);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
+    }
 
-    log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(),
-        ((FlowSpec) spec).getConfigAsProperties()));
     if (triggerListener) {
-      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(spec);
+      AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
       for (Map.Entry<SpecCatalogListener, CallbackResult<AddSpecResponse>> entry: response.getValue().getSuccesses().entrySet()) {
         responseMap.put(entry.getKey().getName(), entry.getValue().getResult());
       }
     }
 
-    boolean compileSuccess = isCompileSuccessful(responseMap);
-
-    if (compileSuccess) {
-      long startTime = System.currentTimeMillis();
-      metrics.updatePutSpecTime(startTime);
-      try {
-        if (!((FlowSpec) spec).isExplain()) {
-          specStore.addSpec(spec);
-        }
-      } catch (IOException e) {
-        throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e);
-      }
+    if (isCompileSuccessful(responseMap)) {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
     } else {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
@@ -325,12 +325,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     return responseMap;
   }
 
-  private boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
+  public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
     AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
-    return addSpecResponse != null
-        && addSpecResponse.getValue() != null
-        && !addSpecResponse.getValue().contains("ConfigException");
+    return isCompileSuccessful(addSpecResponse.getValue());
+  }
+
+  public static boolean isCompileSuccessful(String dag) {
+    return dag != null && !dag.contains(ConfigException.class.getSimpleName());
   }
 
   @Override
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index afc401b..4f2c9ff 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -269,6 +269,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     if (addedSpec instanceof FlowSpec) {
       try {
         FlowSpec flowSpec = (FlowSpec) addedSpec;
+        URI flowSpecUri = flowSpec.getUri();
         Properties jobConfig = new Properties();
         Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties();
         jobConfig.putAll(this.properties);
@@ -293,7 +294,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
           response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
         }
 
-        if (!isExplain) {
+        boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
+
+        if (!isExplain && compileSuccess) {
           this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
 
           if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
@@ -301,11 +304,17 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
             scheduleJob(jobConfig, null);
             if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
               _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
-              this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), false, jobConfig, null));
+              this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
             }
           } else {
             _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
-            this.jobExecutor.execute(new NonScheduledJobRunner(flowSpec.getUri(), true, jobConfig, null));
+            this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
+          }
+        } else {
+          _log.info("Removing the flow spec: {}, isExplain: {}, compileSuccess: {}", addedSpec, isExplain, compileSuccess);
+          if (this.flowCatalog.isPresent()) {
+            _log.debug("Removing flow spec from FlowCatalog: {}", flowSpec);
+            GobblinServiceJobScheduler.this.flowCatalog.get().remove(flowSpecUri, new Properties(), false);
           }
         }
         return new AddSpecResponse<>(response);