You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/01 21:44:28 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1160] No spec delete on gobblin service start

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc395c  [GOBBLIN-1160] No spec delete on gobblin service start
6bc395c is described below

commit 6bc395c3c1428852a30b5add8db7402be7b4b1ae
Author: Arjun <ab...@linkedin.com>
AuthorDate: Mon Jun 1 14:44:20 2020 -0700

    [GOBBLIN-1160] No spec delete on gobblin service start
    
    Closes #3011 from arjun4084346/noSpecRemoveOnStart
---
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |   7 +-
 .../runtime/job_monitor/KafkaJobMonitor.java       |  11 +-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  66 +++++---
 .../modules/core/GobblinServiceManager.java        |   2 +-
 .../service/modules/flow/MockedSpecCompiler.java   |  11 +-
 .../scheduler/GobblinServiceJobScheduler.java      | 133 ++++++++++------
 .../gobblin/service/GobblinServiceManagerTest.java | 175 +++++++++++++++++----
 7 files changed, 293 insertions(+), 112 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index fb27380..bec0445 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -362,6 +362,10 @@ public class FlowSpec implements Configurable, Spec {
     return ConfigUtils.getBoolean(getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
   }
 
+  public boolean isScheduled() {
+    return getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY);
+  }
+
   @Slf4j
   public static class Utils {
     private final static String URI_SCHEME = "gobblin-flow";
@@ -371,7 +375,8 @@ public class FlowSpec implements Configurable, Spec {
     private final static String URI_FRAGMENT = null;
     private final static int EXPECTED_NUM_URI_PATH_TOKENS = 3;
 
-    public static URI createFlowSpecUri(FlowId flowId) throws URISyntaxException {
+    public static URI createFlowSpecUri(FlowId flowId)
+        throws URISyntaxException {
       return new URI(URI_SCHEME, URI_AUTHORITY, createUriPath(flowId), URI_QUERY, URI_FRAGMENT);
     }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
index 0f47625..ba9f06f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/job_monitor/KafkaJobMonitor.java
@@ -24,12 +24,14 @@ import java.util.Collection;
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
 import com.typesafe.config.Config;
 
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
-import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.metastore.DatasetStateStore;
+import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.JobSpec;
 import org.apache.gobblin.runtime.api.JobSpecMonitor;
 import org.apache.gobblin.runtime.api.MutableJobCatalog;
@@ -38,9 +40,6 @@ import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-
 
 /**
  * Abstract {@link JobSpecMonitor} that reads {@link JobSpec}s from a Kafka stream. Subclasses should implement
@@ -124,7 +123,7 @@ public abstract class KafkaJobMonitor extends HighLevelConsumer<byte[], byte[]>
    * It fetches the job name from the given jobSpecUri
    * and deletes its corresponding state store
    * @param jobSpecUri jobSpecUri as created by
-   *                   {@link FlowConfigResourceLocalHandler.FlowUriUtils.createFlowSpecUri}
+   *                   {@link FlowSpec.Utils#createFlowSpecUri}
    * @throws IOException
    */
   private void deleteStateStore(URI jobSpecUri) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 029005c..b64ebd7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -17,12 +17,6 @@
 
 package org.apache.gobblin.runtime.spec_catalog;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.AbstractIdleService;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigException;
-
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.URI;
@@ -33,8 +27,19 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import javax.annotation.Nonnull;
+
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AbstractIdleService;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+
+import javax.annotation.Nonnull;
+import lombok.Getter;
 
 import org.apache.gobblin.instrumented.Instrumented;
 import org.apache.gobblin.metrics.MetricContext;
@@ -55,8 +60,6 @@ import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.callbacks.CallbackResult;
 import org.apache.gobblin.util.callbacks.CallbacksDispatcher;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -78,7 +81,11 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   protected final Logger log;
   protected final MetricContext metricContext;
   protected final MutableStandardMetrics metrics;
+  @Getter
   protected final SpecStore specStore;
+  // a map which keeps a handle of condition variables for each spec being added to the flow catalog
+  // to provide synchronization needed for flow specs
+  private final Map<String, Object> specSyncObjects = new HashMap<>();
 
   private final ClassAliasResolver<SpecStore> aliasResolver;
 
@@ -280,7 +287,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     try {
       spec = getSpec(uri);
     } catch (SpecNotFoundException snfe) {
-      log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatlog"
+      log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog"
           + ", suspecting current modification on SpecStore", uri), snfe);
     }
     return spec;
@@ -289,6 +296,12 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   /**
    * Persist {@link Spec} into {@link SpecStore} and notify {@link SpecCatalogListener} if triggerListener
    * is set to true.
+   * If the {@link Spec} is a {@link FlowSpec} it is persisted if it can be compiled at the time this method received
+   * the spec. `explain` specs are not persisted. The logic of this method is tightly coupled with the logic of
+   * {@link GobblinServiceJobScheduler#onAddSpec()}, which is one of the listener of {@link FlowCatalog}.
+   * We use condition variables {@link #specSyncObjects} to achieve synchronization between
+   * {@link GobblinServiceJobScheduler#NonScheduledJobRunner} thread and this thread to ensure deletion of
+   * {@link FlowSpec} happens after the corresponding run once flow is submitted to the orchestrator.
    *
    * @param spec The Spec to be added
    * @param triggerListener True if listeners should be notified.
@@ -301,13 +314,9 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     Preconditions.checkNotNull(flowSpec);
 
     log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", flowSpec.getUri(), flowSpec.getConfigAsProperties()));
-    try {
-      long startTime = System.currentTimeMillis();
-      specStore.addSpec(flowSpec);
-      metrics.updatePutSpecTime(startTime);
-    } catch (IOException e) {
-      throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
-    }
+
+    Object syncObject = new Object();
+    specSyncObjects.put(flowSpec.getUri().toString(), syncObject);
 
     if (triggerListener) {
       AddSpecResponse<CallbacksDispatcher.CallbackResults<SpecCatalogListener, AddSpecResponse>> response = this.listeners.onAddSpec(flowSpec);
@@ -317,7 +326,21 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
 
     if (isCompileSuccessful(responseMap)) {
-      responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+      synchronized (syncObject) {
+        try {
+          if (!flowSpec.isExplain()) {
+            long startTime = System.currentTimeMillis();
+            specStore.addSpec(spec);
+            metrics.updatePutSpecTime(startTime);
+          }
+          responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("true"));
+        } catch (IOException e) {
+          throw new RuntimeException("Cannot add Spec to Spec store: " + flowSpec, e);
+        } finally {
+          syncObject.notifyAll();
+          this.specSyncObjects.remove(flowSpec.getUri().toString());
+        }
+      }
     } else {
       responseMap.put(ServiceConfigKeys.COMPILATION_SUCCESSFUL, new AddSpecResponse<>("false"));
     }
@@ -326,7 +349,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   public static boolean isCompileSuccessful(Map<String, AddSpecResponse> responseMap) {
-    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
+    AddSpecResponse<String> addSpecResponse = responseMap.getOrDefault(
+        ServiceConfigKeys.GOBBLIN_SERVICE_JOB_SCHEDULER_LISTENER_CLASS, new AddSpecResponse<>(""));
 
     return isCompileSuccessful(addSpecResponse.getValue());
   }
@@ -364,4 +388,8 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
       throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e);
     }
   }
+
+  public Object getSyncObject(String specUri) {
+    return this.specSyncObjects.getOrDefault(specUri, null);
+  }
 }
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index fefbfa2..cb9e161 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -310,7 +310,7 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
         }
       });
       this.restliServer = EmbeddedRestliServer.builder()
-          .resources(Lists.<Class<? extends BaseResource>>newArrayList(FlowConfigsResource.class))
+          .resources(Lists.newArrayList(FlowConfigsResource.class, FlowConfigsV2Resource.class))
           .injector(injector)
           .build();
       if (config.hasPath(ServiceConfigKeys.SERVICE_PORT)) {
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
index 7a9e5db..09e3504 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/MockedSpecCompiler.java
@@ -36,12 +36,14 @@ import org.apache.gobblin.util.ConfigUtils;
 
 
 /**
- * This mocked SpecCompiler class creates 3 dummy job specs to emulate multi hop flow spec compiler.
+ * This mocked SpecCompiler class creates 3 dummy job specs to emulate flow spec compiler.
+ * It can also be used to compile in a certain way or not to compile at all to write negative test cases.
  * It uses {@link InMemorySpecExecutor} for these dummy specs.
  */
 public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
 
   private static final int NUMBER_OF_JOBS = 3;
+  public static final String UNCOMPILABLE_FLOW = "uncompilableFlow";
 
   public MockedSpecCompiler(Config config) {
     super(config);
@@ -49,6 +51,11 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
 
   @Override
   public Dag<JobExecutionPlan> compileFlow(Spec spec) {
+    String flowName = (String) ((FlowSpec) spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY);
+    if (flowName.equalsIgnoreCase(UNCOMPILABLE_FLOW)) {
+      return null;
+    }
+
     List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
 
     long flowExecutionId = System.currentTimeMillis();
@@ -57,7 +64,7 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
     while(i++ < NUMBER_OF_JOBS) {
       String specUri = "/foo/bar/spec/" + i;
       Properties properties = new Properties();
-      properties.put(ConfigurationKeys.FLOW_NAME_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY));
+      properties.put(ConfigurationKeys.FLOW_NAME_KEY, flowName);
       properties.put(ConfigurationKeys.FLOW_GROUP_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY));
       properties.put(ConfigurationKeys.JOB_NAME_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_NAME_KEY) + "_" + i);
       properties.put(ConfigurationKeys.JOB_GROUP_KEY, ((FlowSpec)spec).getConfigAsProperties().get(ConfigurationKeys.FLOW_GROUP_KEY) + "_" + i);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 4f2c9ff..5c70807 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -254,7 +254,11 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
   }
 
-  /** {@inheritDoc} */
+  /**
+   *
+   * @param addedSpec spec to be added
+   * @return add spec response
+   */
   @Override
   public AddSpecResponse onAddSpec(Spec addedSpec) {
     if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
@@ -266,63 +270,55 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     _log.info("New Flow Spec detected: " + addedSpec);
 
-    if (addedSpec instanceof FlowSpec) {
-      try {
-        FlowSpec flowSpec = (FlowSpec) addedSpec;
-        URI flowSpecUri = flowSpec.getUri();
-        Properties jobConfig = new Properties();
-        Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties();
-        jobConfig.putAll(this.properties);
-        jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, addedSpec.getUri().toString());
-        jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
-            flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
-        jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
-            ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
-        if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
-            flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
-          jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
-              flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
-        }
-        boolean isExplain = ConfigUtils.getBoolean(flowSpec.getConfig(), ConfigurationKeys.FLOW_EXPLAIN_KEY, false);
-        String response = null;
-
-        // always try to compile the flow to verify if it is compilable
-        Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
-        if (dag != null && !dag.isEmpty()) {
-          response = dag.toString();
-        } else if (!flowSpec.getCompilationErrors().isEmpty()) {
-          response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
-        }
+    if (!(addedSpec instanceof FlowSpec)) {
+      return null;
+    }
 
-        boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
+    FlowSpec flowSpec = (FlowSpec) addedSpec;
+    URI flowSpecUri = flowSpec.getUri();
+    Properties jobConfig = createJobConfig(flowSpec);
+    boolean isExplain = flowSpec.isExplain();
+    String response = null;
+
+    // always try to compile the flow to verify if it is compilable
+    Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
+    if (dag != null && !dag.isEmpty()) {
+      response = dag.toString();
+    } else if (!flowSpec.getCompilationErrors().isEmpty()) {
+      response = Arrays.toString(flowSpec.getCompilationErrors().toArray());
+    }
 
-        if (!isExplain && compileSuccess) {
-          this.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+    boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
 
-          if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-            _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
-            scheduleJob(jobConfig, null);
-            if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-              _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
-              this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
-            }
-          } else {
-            _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
-            this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
-          }
-        } else {
-          _log.info("Removing the flow spec: {}, isExplain: {}, compileSuccess: {}", addedSpec, isExplain, compileSuccess);
-          if (this.flowCatalog.isPresent()) {
-            _log.debug("Removing flow spec from FlowCatalog: {}", flowSpec);
-            GobblinServiceJobScheduler.this.flowCatalog.get().remove(flowSpecUri, new Properties(), false);
-          }
-        }
-        return new AddSpecResponse<>(response);
+    if (isExplain || !compileSuccess) {
+      // todo: in case of a scheudled job, we should also check if the job schedule is a valid cron schedule
+      //  so it can be scheduled
+      _log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}", addedSpec, isExplain, compileSuccess);
+      return new AddSpecResponse<>(response);
+    }
+
+    // todo : we should probably not schedule a flow if it is a runOnce flow
+    this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+
+    if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+      _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
+      try {
+        scheduleJob(jobConfig, null);
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
+        this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
+        return null;
+      }
+      if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
+        _log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
+        this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
       }
+    } else {
+      _log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
+      this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
     }
-    return null;
+
+    return new AddSpecResponse<>(response);
   }
 
   public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
@@ -348,7 +344,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         unscheduleJob(deletedSpecURI.toString());
       } else {
         _log.warn(String.format(
-            "Spec with URI: %s was not found in cache. May be it was cleaned, if not please " + "clean it manually",
+            "Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
             deletedSpecURI));
       }
     } catch (JobException | IOException e) {
@@ -384,6 +380,27 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
   }
 
+  private Properties createJobConfig(FlowSpec flowSpec) {
+    Properties jobConfig = new Properties();
+    Properties flowSpecProperties = flowSpec.getConfigAsProperties();
+
+    jobConfig.putAll(this.properties);
+    jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getUri().toString());
+    jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
+        flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
+    jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+        ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
+
+    // todo : we should check if the job schedule is a valid cron schedule
+    if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
+        flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+      jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
+          flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+    }
+
+    return jobConfig;
+  }
+
   /**
    * A Gobblin job to be scheduled.
    */
@@ -435,10 +452,22 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       try {
         GobblinServiceJobScheduler.this.runJob(this.jobConfig, this.jobListener);
         if (flowCatalog.isPresent() && removeSpec) {
+          Object syncObject = GobblinServiceJobScheduler.this.flowCatalog.get().getSyncObject(specUri.toString());
+          if (syncObject != null) {
+            // if the sync object does not exist, this job must be set to run due to job submission at service restart
+            synchronized (syncObject) {
+              while (!GobblinServiceJobScheduler.this.flowCatalog.get().exists(specUri)) {
+                syncObject.wait();
+              }
+            }
+          }
           GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false);
+          GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
         }
       } catch (JobException je) {
         _log.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
+      } catch (InterruptedException e) {
+      _log.error("Failed to delete the spec " + specUri, e);
       }
     }
   }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index 282ede6..d1e793f 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.gobblin.service;
 import java.io.File;
 import java.net.URI;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
@@ -59,7 +60,9 @@ import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.service.modules.core.GitConfigMonitor;
 import org.apache.gobblin.service.modules.core.GobblinServiceManager;
+import org.apache.gobblin.service.modules.flow.MockedSpecCompiler;
 import org.apache.gobblin.service.monitoring.FsJobStatusRetriever;
+import org.apache.gobblin.testing.AssertWithBackoff;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -80,6 +83,10 @@ public class GobblinServiceManagerTest {
 
   private static final String TEST_GROUP_NAME = "testGroup";
   private static final String TEST_FLOW_NAME = "testFlow";
+  private static final FlowId TEST_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+  private static final FlowId UNCOMPILABLE_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME)
+      .setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
   private static final String TEST_DUMMY_GROUP_NAME = "dummyGroup";
@@ -88,11 +95,18 @@ public class GobblinServiceManagerTest {
   private static final String TEST_SOURCE_NAME = "testSource";
   private static final String TEST_SINK_NAME = "testSink";
 
+  private final URI TEST_URI = FlowSpec.Utils.createFlowSpecUri(TEST_FLOW_ID);
+
   private MockGobblinServiceManager gobblinServiceManager;
-  private FlowConfigClient flowConfigClient;
+  private FlowConfigV2Client flowConfigClient;
 
   private Git gitForPush;
   private TestingServer testingServer;
+  Properties serviceCoreProperties = new Properties();
+  Map<String, String> flowProperties = Maps.newHashMap();
+
+  public GobblinServiceManagerTest() throws Exception {
+  }
 
   @BeforeClass
   public void setup() throws Exception {
@@ -101,7 +115,11 @@ public class GobblinServiceManagerTest {
     ITestMetastoreDatabase testMetastoreDatabase = TestMetastoreDatabaseFactory.get();
 
     testingServer = new TestingServer(true);
-    Properties serviceCoreProperties = new Properties();
+
+    flowProperties.put("param1", "value1");
+    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
+    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
+
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_USER_KEY, "testUser");
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, "testPassword");
     serviceCoreProperties.put(ConfigurationKeys.STATE_STORE_DB_URL_KEY, testMetastoreDatabase.getJdbcUrl());
@@ -131,6 +149,8 @@ public class GobblinServiceManagerTest {
 
     serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_JOB_STATUS_MONITOR_ENABLED_KEY, false);
 
+    serviceCoreProperties.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, MockedSpecCompiler.class.getCanonicalName());
+
     // Create a bare repository
     RepositoryCache.FileKey fileKey = RepositoryCache.FileKey.exact(new File(GIT_REMOTE_REPO_DIR), FS.DETECTED);
     fileKey.open(false).create(true);
@@ -145,8 +165,8 @@ public class GobblinServiceManagerTest {
         ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
     this.gobblinServiceManager.start();
 
-    this.flowConfigClient = new FlowConfigClient(String.format("http://localhost:%s/",
-        this.gobblinServiceManager.getRestLiServer().getPort()));
+    this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
+        this.gobblinServiceManager.getRestLiServer().getListeningURI().getPort()));
   }
 
   private void cleanUpDir(String dir) throws Exception {
@@ -183,50 +203,135 @@ public class GobblinServiceManagerTest {
     }
   }
 
+  /**
+   * To test an existing flow in a spec store does not get deleted just because it is not compilable during service restarts
+   */
   @Test
-  public void testCreate() throws Exception {
-    Map<String, String> flowProperties = Maps.newHashMap();
-    flowProperties.put("param1", "value1");
-    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
-    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
+  public void testRestart() throws Exception {
+    FlowConfig uncompilableFlowConfig = new FlowConfig().setId(UNCOMPILABLE_FLOW_ID).setTemplateUris(TEST_TEMPLATE_URI)
+        .setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
+        .setProperties(new StringMap(flowProperties));
+    FlowSpec uncompilableSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(uncompilableFlowConfig);
+    FlowConfig runOnceFlowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+    FlowSpec runOnceSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(runOnceFlowConfig);
+
+    // add the non compilable flow directly to the spec store skipping flow catalog which would not allow this
+    this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(uncompilableSpec);
+    this.gobblinServiceManager.getFlowCatalog().getSpecStore().addSpec(runOnceSpec);
+
+    List<Spec> specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
+
+    Assert.assertEquals(specs.size(), 2);
+    if (specs.get(0).getUri().equals(uncompilableSpec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), runOnceSpec.getUri());
+    } else if (specs.get(0).getUri().equals(runOnceSpec.getUri())) {
+      Assert.assertEquals(specs.get(1).getUri(), uncompilableSpec.getUri());
+    } else {
+      Assert.fail();
+    }
+
+    // restart the service
+    serviceReboot();
+
+    // runOnce job should get deleted from the spec store after running but uncompilable flow should stay
+    AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(20000L).backoffFactor(1)
+        .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1,
+            "Waiting for the runOnce job to finish");
+
+    specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.get(0).getUri(), uncompilableSpec.getUri());
+    Assert.assertTrue(uncompilableSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY));
+
+    // clean it
+    this.gobblinServiceManager.getFlowCatalog().remove(uncompilableSpec.getUri());
+    specs = (List<Spec>) this.gobblinServiceManager.getFlowCatalog().getSpecs();
+    Assert.assertEquals(specs.size(), 0);
+  }
+
+  @Test (dependsOnMethods = "testRestart")
+  public void testUncompilableJob() throws Exception {
+    FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
+    URI uri = FlowSpec.Utils.createFlowSpecUri(flowId);
+    FlowConfig flowConfig = new FlowConfig().setId(flowId)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+
+    RestLiResponseException exception = null;
+    try {
+      this.flowConfigClient.createFlowConfig(flowConfig);
+    } catch (RestLiResponseException e) {
+      exception = e;
+    }
+    Assert.assertEquals(exception.getStatus(), HttpStatus.BAD_REQUEST_400);
+    // uncompilable job should not be persisted
+    Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
+    Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
+  }
+
+  @Test (dependsOnMethods = "testUncompilableJob")
+  public void testRunOnceJob() throws Exception {
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties));
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    // runOnce job is deleted soon after it is orchestrated
+    AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
+        .assertTrue(input -> this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 0,
+          "Waiting for job to get orchestrated...");
+    AssertWithBackoff.create().maxSleepMs(100L).timeoutMs(1000L).backoffFactor(1)
+          .assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
+              "Waiting for job to get orchestrated...");
+  }
 
+  @Test (dependsOnMethods = "testRunOnceJob")
+  public void testExplainJob() throws Exception {
     FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+        .setTemplateUris(TEST_TEMPLATE_URI).setProperties(new StringMap(flowProperties)).setExplain(true);
+
+    this.flowConfigClient.createFlowConfig(flowConfig);
+
+    // explain job should not be persisted
+    Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
+    Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
+  }
+
+  @Test (dependsOnMethods = "testExplainJob")
+  public void testCreate() throws Exception {
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true))
         .setProperties(new StringMap(flowProperties));
 
     this.flowConfigClient.createFlowConfig(flowConfig);
-    Assert.assertTrue(this.gobblinServiceManager.getFlowCatalog().getSpecs().size() == 1, "Flow that was created is not "
-        + "reflecting in FlowCatalog");
+    Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 1);
+    Assert.assertTrue(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
   }
 
   @Test (dependsOnMethods = "testCreate")
   public void testCreateAgain() throws Exception {
-    Map<String, String> flowProperties = Maps.newHashMap();
-    flowProperties.put("param1", "value1");
-    flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
-    flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
-
-    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
         .setProperties(new StringMap(flowProperties));
 
+    RestLiResponseException exception = null;
     try {
       this.flowConfigClient.createFlowConfig(flowConfig);
     } catch (RestLiResponseException e) {
-      Assert.fail("Create Again should pass without complaining that the spec already exists.");
+      exception = e;
     }
+    Assert.assertNotNull(exception);
+    Assert.assertEquals(exception.getStatus(), HttpStatus.CONFLICT_409);
   }
 
   @Test (dependsOnMethods = "testCreateAgain")
   public void testGet() throws Exception {
-    FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
-    FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(flowId);
+    FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(TEST_FLOW_ID);
 
     Assert.assertEquals(flowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
     Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME);
     Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
     Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI);
-    Assert.assertFalse(flowConfig.getSchedule().isRunImmediately());
+    Assert.assertTrue(flowConfig.getSchedule().isRunImmediately());
     // Add this assert back when getFlowSpec() is changed to return the raw flow spec
     //Assert.assertEquals(flowConfig.getProperties().size(), 1);
     Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
@@ -250,6 +355,7 @@ public class GobblinServiceManagerTest {
 
     FlowConfig retrievedFlowConfig = this.flowConfigClient.getFlowConfig(flowId);
 
+    Assert.assertTrue(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()));
     Assert.assertEquals(retrievedFlowConfig.getId().getFlowGroup(), TEST_GROUP_NAME);
     Assert.assertEquals(retrievedFlowConfig.getId().getFlowName(), TEST_FLOW_NAME);
     Assert.assertEquals(retrievedFlowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE);
@@ -263,6 +369,7 @@ public class GobblinServiceManagerTest {
   @Test (dependsOnMethods = "testUpdate")
   public void testDelete() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+    URI uri = FlowSpec.Utils.createFlowSpecUri(flowId);
 
     // make sure flow config exists
     FlowConfig flowConfig = this.flowConfigClient.getFlowConfig(flowId);
@@ -275,6 +382,7 @@ public class GobblinServiceManagerTest {
       this.flowConfigClient.getFlowConfig(flowId);
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
+      Assert.assertFalse(this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(uri.toString()));
       return;
     }
 
@@ -290,7 +398,7 @@ public class GobblinServiceManagerTest {
     Files.write("flow.name=testFlow\nflow.group=testGroup\nparam1=value20\n", testFlowFile, Charsets.UTF_8);
 
     Collection<Spec> specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
-    Assert.assertTrue(specs.size() == 0);
+    Assert.assertEquals(specs.size(), 0);
 
     // add, commit, push
     this.gitForPush.add().addFilepattern("gobblin-config/testGroup/testFlow.pull").call();
@@ -300,14 +408,12 @@ public class GobblinServiceManagerTest {
     // polling is every 5 seconds, so wait twice as long and check
     TimeUnit.SECONDS.sleep(10);
 
-    specs = this.gobblinServiceManager.getFlowCatalog().getSpecs();
-    Assert.assertTrue(specs.size() == 1);
+    // spec generated using git monitor do not have schedule, so their life cycle should be similar to runOnce jobs
+    Assert.assertEquals(this.gobblinServiceManager.getFlowCatalog().getSpecs().size(), 0);
 
-    FlowSpec spec = (FlowSpec) (specs.iterator().next());
-    Assert.assertEquals(spec.getUri(), new URI("gobblin-flow:/testGroup/testFlow"));
-    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), "testFlow");
-    Assert.assertEquals(spec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), "testGroup");
-    Assert.assertEquals(spec.getConfig().getString("param1"), "value20");
+    AssertWithBackoff.create().maxSleepMs(200L).timeoutMs(2000L).backoffFactor(1)
+        .assertTrue(input -> !this.gobblinServiceManager.getScheduler().getScheduledFlowSpecs().containsKey(TEST_URI.toString()),
+            "Waiting for job to get orchestrated...");
   }
 
   @Test
@@ -354,7 +460,15 @@ public class GobblinServiceManagerTest {
     } catch (RestLiResponseException e) {
       Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404);
     }
-    cleanUpDir(FLOW_SPEC_STORE_DIR);
+  }
+
+  private void serviceReboot() throws Exception {
+    this.gobblinServiceManager.stop();
+    this.gobblinServiceManager = new MockGobblinServiceManager("CoreService", "1",
+        ConfigUtils.propertiesToConfig(serviceCoreProperties), Optional.of(new Path(SERVICE_WORK_DIR)));
+    this.flowConfigClient = new FlowConfigV2Client(String.format("http://127.0.0.1:%s/",
+        this.gobblinServiceManager.getRestLiServer().getPort()));
+    this.gobblinServiceManager.start();
   }
 
   class MockGobblinServiceManager extends GobblinServiceManager {
@@ -368,6 +482,5 @@ public class GobblinServiceManagerTest {
     protected EmbeddedRestliServer getRestLiServer() {
       return this.restliServer;
     }
-
   }
 }
\ No newline at end of file