You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2019/04/24 20:04:46 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-746] Async loading FlowSpec

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new d972f72  [GOBBLIN-746] Async loading FlowSpec
d972f72 is described below

commit d972f72edf359b1348554ba9948f4942ea0c8c69
Author: autumnust <le...@linkedin.com>
AuthorDate: Wed Apr 24 13:04:38 2019 -0700

    [GOBBLIN-746] Async loading FlowSpec
    
    Dear Gobblin maintainers,
    
    Please accept this PR. I understand that it will
    not be reviewed until I have checked off all the
    steps below!
    
    This PR contains:
    - Change `SpecStore` interface to add listing-like
    methods, returning an iterator of all `Spec`s it
    contains.
    - Loading and scheduling `FlowSpec` in
    `GobblinServiceJobScheduler` asynchronously and
    notifying the completeness using a countdownlatch.
    - Add unit tests for `FSSpecStore` and
    `GobblinServiceJobScheduler`.
    
    ### JIRA
    - [x] My PR addresses the following [Gobblin JIRA]
        -
    https://issues.apache.org/jira/browse/GOBBLIN-746
    
    ### Description
    - [x] Here are some details about my PR, including
    screenshots (if applicable):
    
    ### Tests
    - [x] My PR adds the following unit tests __OR__
    does not need testing for this extremely good
    reason:
    
    ### Commits
    - [x] My commits all reference JIRA issues in
    their subject lines, and I have squashed multiple
    commits if they address the same issue. In
    addition, my commits follow the guidelines from
    "[How to write a good git commit
    message](http://chris.beams.io/posts/git-
    commit/)":
        1. Subject is separated from body by a blank line
        2. Subject is limited to 50 characters
        3. Subject does not end with a period
        4. Subject uses the imperative mood ("add", not
    "adding")
        5. Body wraps at 72 characters
        6. Body explains "what" and "why", not "how"
    
    Add method in specStore to return an iterator of
    Spec URI
    
    Add async loading and unit test
    
    Fix fingbusMain
    
    Address comments and fix travis failure
    
    Closes #2611 from autumnust/asyncLoadingFlowSpec
---
 .../gobblin/runtime/api/MutableSpecCatalog.java    |   2 +-
 .../gobblin/runtime/api/SpecSerDeException.java    |  53 +++++++
 .../org/apache/gobblin/runtime/api/SpecStore.java  |  14 ++
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  11 +-
 .../gobblin/runtime/spec_store/FSSpecStore.java    |  76 +++++++---
 .../spec_catalog/FlowCatalogTest.java              | 105 +++-----------
 .../spec_catalog/TopologyCatalogTest.java          |   2 +-
 .../runtime/spec_store/FSSpecStoreTest.java        | 157 +++++++++++++++++++++
 gobblin-service/build.gradle                       |   3 +
 .../scheduler/GobblinServiceJobScheduler.java      | 123 ++++++++++------
 .../scheduler/GobblinServiceJobSchedulerTest.java  |  88 +++++++++++-
 11 files changed, 473 insertions(+), 161 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
index b64fb17..3c1573a 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java
@@ -48,7 +48,7 @@ public interface MutableSpecCatalog extends SpecCatalog {
    * on adding a {@link Spec} to the {@link SpecCatalog}. The key for each entry is the name of the {@link SpecCatalogListener}
    * and the value is the result of the the action taken by the listener returned as an instance of {@link AddSpecResponse}.
    * */
-  public Map<String, AddSpecResponse> put(Spec spec);
+  Map<String, AddSpecResponse> put(Spec spec);
 
   /**
    * Removes an existing {@link Spec} with the given URI.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
new file mode 100644
index 0000000..776ff6f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSerDeException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+
+
+/**
+ * An exception when {@link Spec} cannot be correctly serialized/deserialized from underlying storage.
+ */
+public class SpecSerDeException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The URI that triggered SerDe error.
+   * Could be a single Spec's URI or parent-level URI.
+   */
+  private final URI errorUri;
+
+  public SpecSerDeException(URI errorUri) {
+    super("Error occurred in loading of Spec with URI " + errorUri);
+    this.errorUri = errorUri;
+  }
+
+  public SpecSerDeException(URI errorUri, Throwable cause) {
+    super("Error occurred in loading URI " + errorUri, cause);
+    this.errorUri = errorUri;
+  }
+
+  public SpecSerDeException(String errorMsg, URI errorUri, Throwable cause) {
+    super("Error occurred in loading URI " + errorUri + " with message:" + errorMsg, cause);
+    this.errorUri = errorUri;
+  }
+
+  public URI getErrorUri() {
+    return errorUri;
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index dff8321..5bfeba9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -17,9 +17,11 @@
 
 package org.apache.gobblin.runtime.api;
 
+import com.google.common.base.Optional;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.Iterator;
 
 
 public interface SpecStore {
@@ -105,4 +107,16 @@ public interface SpecStore {
    * @throws IOException Exception in retrieving {@link Spec}s.
    */
   Collection<Spec> getSpecs() throws IOException;
+
+  /**
+   * Return an iterator of Spec URIs(Spec identifiers)
+   */
+  Iterator<URI> getSpecURIs() throws IOException;
+
+  /**
+   * @return A URI to identify the SpecStore itself.
+   * e.g. For File-System based implementation of {@link SpecStore}, the URI will be associated
+   * with root-level FileSystem directory.
+   */
+  Optional<URI> getSpecStoreURI();
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index ddf8cbe..71e0f9f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -23,12 +23,14 @@ import java.net.URI;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -205,6 +207,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   /* Catalog core functionality                     *
   /**************************************************/
 
+  public Iterator<URI> getSpecURIs() throws SpecSerDeException{
+    try {
+      return specStore.getSpecURIs();
+    } catch (IOException ioe) {
+      throw new SpecSerDeException("Cannot retrieve Specs' URI from Spec Store", specStore.getSpecStoreURI().get(), ioe);
+    }
+  }
+
   /**
    * Get all specs from {@link SpecStore}
    */
@@ -213,7 +223,6 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     try {
       return specStore.getSpecs();
       // TODO: Have kind of metrics keeping track of specs that failed to be deserialized.
-
     } catch (IOException e) {
       throw new RuntimeException("Cannot retrieve Specs from Spec store", e);
     }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 0e2c482..e93464d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -17,38 +17,35 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
-
-import java.io.FileNotFoundException;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
-
-import org.apache.commons.lang3.StringUtils;
+import java.util.Iterator;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.util.PathUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import com.typesafe.config.Config;
-
-import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment;
-import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecStore;
-import org.apache.gobblin.util.PathUtils;
-
 
 /**
  * The Spec Store for file system to persist the Spec information.
@@ -104,7 +101,7 @@ public class FSSpecStore implements SpecStore {
 
   /**
    * @param specUri path of the spec
-   * @return empty string for topology spec, as topolgies do not have a group,
+   * @return empty string for topology spec, as topologies do not have a group,
    *         group name for flow spec
    */
   public static String getSpecGroup(Path specUri) {
@@ -238,6 +235,35 @@ public class FSSpecStore implements SpecStore {
     return specs;
   }
 
+  @Override
+  public Iterator<URI> getSpecURIs() throws IOException {
+    final RemoteIterator<LocatedFileStatus> it = fs.listFiles(this.fsSpecStoreDirPath, true);
+    return new Iterator<URI>() {
+      @Override
+      public boolean hasNext() {
+        try {
+          return it.hasNext();
+        } catch (IOException ioe) {
+          throw new RuntimeException("Failed to determine if there's next element available due to:", ioe);
+        }
+      }
+
+      @Override
+      public URI next() {
+        try {
+          return getURIFromPath(it.next().getPath(), fsSpecStoreDirPath);
+        } catch (IOException ioe) {
+          throw new RuntimeException("Failed to fetch next element due to:", ioe);
+        }
+      }
+    };
+  }
+
+  @Override
+  public Optional<URI> getSpecStoreURI() {
+    return Optional.of(this.fsSpecStoreDirPath.toUri());
+  }
+
   /**
    * For multiple {@link FlowSpec}s to be loaded, catch Exceptions when one of them failed to be loaded and
    * continue with the rest.
@@ -293,6 +319,7 @@ public class FSSpecStore implements SpecStore {
   }
 
   /**
+   * Construct a file path given URI and version of a spec.
    *
    * @param fsSpecStoreDirPath The directory path for specs.
    * @param uri Uri as the identifier of JobSpec
@@ -301,4 +328,15 @@ public class FSSpecStore implements SpecStore {
   protected Path getPathForURI(Path fsSpecStoreDirPath, URI uri, String version) {
     return PathUtils.addExtension(PathUtils.mergePaths(fsSpecStoreDirPath, new Path(uri)), version);
   }
+
+  /**
+   * Recover {@link Spec}'s URI from a file path.
+   * Note that there's no version awareness of this method, as Spec's version is currently not supported.
+   *
+   * @param fsPath The given file path to get URI from.
+   * @return The exact URI of a Spec.
+   */
+  protected URI getURIFromPath(Path fsPath, Path fsSpecStoreDirPath) {
+    return PathUtils.relativizePath(fsPath, fsSpecStoreDirPath).toUri();
+  }
 }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
similarity index 63%
rename from gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
rename to gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
index 2d73290..3d73281 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalogTest.java
@@ -15,39 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.spec_catalog;
+package org.apache.gobblin.runtime.spec_catalog;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.typesafe.config.Config;
 import java.io.File;
-import java.io.IOException;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Collection;
 import java.util.Properties;
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecExecutor;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
-import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
-import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
-import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PathUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -90,27 +77,33 @@ public class FlowCatalogTest {
     this.serviceLauncher.start();
 
     // Create Spec to play with
-    this.flowSpec = initFlowSpec();
+    this.flowSpec = initFlowSpec(SPEC_STORE_DIR);
   }
 
-  private FlowSpec initFlowSpec() {
+  /**
+   * Create FlowSpec with default URI
+   */
+  public static FlowSpec initFlowSpec(String specStore) {
+    return initFlowSpec(specStore, computeFlowSpecURI());
+  }
+
+  /**
+   * Create FLowSpec with specified URI and SpecStore location.
+   */
+  public static FlowSpec initFlowSpec(String specStore, URI uri){
     Properties properties = new Properties();
-    properties.put("specStore.fs.dir", SPEC_STORE_DIR);
+    properties.put("specStore.fs.dir", specStore);
     properties.put("specExecInstance.capabilities", "source:destination");
     Config config = ConfigUtils.propertiesToConfig(properties);
 
     SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config);
 
     FlowSpec.Builder flowSpecBuilder = null;
-    try {
-      flowSpecBuilder = FlowSpec.builder(computeFlowSpecURI())
-          .withConfig(config)
-          .withDescription(SPEC_DESCRIPTION)
-          .withVersion(SPEC_VERSION)
-          .withTemplate(new URI("templateURI"));
-    } catch (URISyntaxException e) {
-      throw new RuntimeException(e);
-    }
+    flowSpecBuilder = FlowSpec.builder(uri)
+        .withConfig(config)
+        .withDescription(SPEC_DESCRIPTION)
+        .withVersion(SPEC_VERSION)
+        .withTemplate(URI.create("templateURI"));
     return flowSpecBuilder.build();
   }
 
@@ -125,64 +118,6 @@ public class FlowCatalogTest {
     }
   }
 
-  /**
-   * Make sure that when there's on spec failed to be deserialized, the rest of spec in specStore can
-   * still be taken care of.
-   */
-  @Test
-  public void testGetSpecRobustness() throws Exception {
-
-    File specDir = Files.createTempDir();
-    Properties properties = new Properties();
-    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
-    SpecSerDe serde = Mockito.mock(SpecSerDe.class);
-    TestFsSpecStore fsSpecStore = new TestFsSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
-
-    // Version is specified as 0,1,2
-    File specFileFail = new File(specDir, "spec_fail");
-    Assert.assertTrue(specFileFail.createNewFile());
-    File specFile1 = new File(specDir, "spec0");
-    Assert.assertTrue(specFile1.createNewFile());
-    File specFile2 = new File(specDir, "spec1");
-    Assert.assertTrue(specFile2.createNewFile());
-    File specFile3 = new File(specDir, "serDeFail");
-    Assert.assertTrue(specFile3.createNewFile());
-
-    FileSystem fs = FileSystem.getLocal(new Configuration());
-    Assert.assertEquals(fs.getFileStatus(new Path(specFile3.getAbsolutePath())).getLen(), 0);
-
-    Collection<Spec> specList = fsSpecStore.getSpecs();
-    // The fail and serDe datasets wouldn't survive
-    Assert.assertEquals(specList.size(), 2);
-    for (Spec spec: specList) {
-      Assert.assertTrue(!spec.getDescription().contains("spec_fail"));
-      Assert.assertTrue(!spec.getDescription().contains("serDeFail"));
-    }
-  }
-
-  class TestFsSpecStore extends FSSpecStore {
-    public TestFsSpecStore(Config sysConfig, SpecSerDe specSerDe) throws IOException {
-      super(sysConfig, specSerDe);
-    }
-
-    @Override
-    protected Spec readSpecFromFile(Path path) throws IOException {
-      if (path.getName().contains("fail")) {
-        throw new IOException("Mean to fail in the test");
-      } else if (path.getName().contains("serDeFail")) {
-
-        // Simulate the way that a serDe exception
-        FSDataInputStream fis = fs.open(path);
-        SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
-
-        // This line should never be reached since we generate SerDe Exception on purpose.
-        Assert.assertTrue(false);
-        return null;
-      }
-      else return initFlowSpec();
-    }
-  }
-
   @Test
   public void createFlowSpec() {
     // List Current Specs
@@ -239,7 +174,7 @@ public class FlowCatalogTest {
     Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion");
   }
 
-  public URI computeFlowSpecURI() {
+  public static URI computeFlowSpecURI() {
     // Make sure this is relative
     URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri();
     return uri;
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalogTest.java
similarity index 99%
rename from gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
rename to gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalogTest.java
index a3490ff..4254b04 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalogTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.spec_catalog;
+package org.apache.gobblin.runtime.spec_catalog;
 
 import java.io.File;
 import java.net.URI;
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
new file mode 100644
index 0000000..e0e6ebb
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/FSSpecStoreTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest.*;
+
+
+public class FSSpecStoreTest {
+
+  @Test
+  public void testPathConversion() throws Exception {
+    Properties properties = new Properties();
+    File tmpDir = Files.createTempDir();
+    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, tmpDir.getAbsolutePath());
+    SpecSerDe specSerDe = Mockito.mock(SpecSerDe.class);
+    FSSpecStore fsSpecStore = new FSSpecStore(ConfigUtils.propertiesToConfig(properties), specSerDe);
+
+    Path rootPath = new Path("/a/b/c");
+    URI uri = URI.create("ddd");
+    Assert.assertEquals(fsSpecStore.getURIFromPath(fsSpecStore.getPathForURI(rootPath, uri, ""), rootPath), uri);
+  }
+
+  /**
+   * Make sure that when there's on spec failed to be deserialized, the rest of spec in specStore can
+   * still be taken care of.
+   */
+  @Test
+  public void testGetSpecRobustness() throws Exception {
+
+    File specDir = Files.createTempDir();
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+    SpecSerDe serde = Mockito.mock(SpecSerDe.class);
+    TestFsSpecStore fsSpecStore = new TestFsSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
+
+    // Version is specified as 0,1,2
+    File specFileFail = new File(specDir, "spec_fail");
+    Assert.assertTrue(specFileFail.createNewFile());
+    File specFile1 = new File(specDir, "spec0");
+    Assert.assertTrue(specFile1.createNewFile());
+    File specFile2 = new File(specDir, "spec1");
+    Assert.assertTrue(specFile2.createNewFile());
+    File specFile3 = new File(specDir, "serDeFail");
+    Assert.assertTrue(specFile3.createNewFile());
+
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    Assert.assertEquals(fs.getFileStatus(new Path(specFile3.getAbsolutePath())).getLen(), 0);
+
+    Collection<Spec> specList = fsSpecStore.getSpecs();
+    // The fail and serDe datasets wouldn't survive
+    Assert.assertEquals(specList.size(), 2);
+    for (Spec spec: specList) {
+      Assert.assertTrue(!spec.getDescription().contains("spec_fail"));
+      Assert.assertTrue(!spec.getDescription().contains("serDeFail"));
+    }
+  }
+
+  class TestFsSpecStore extends FSSpecStore {
+    public TestFsSpecStore(Config sysConfig, SpecSerDe specSerDe) throws IOException {
+      super(sysConfig, specSerDe);
+    }
+
+    @Override
+    protected Spec readSpecFromFile(Path path) throws IOException {
+      if (path.getName().contains("fail")) {
+        throw new IOException("Mean to fail in the test");
+      } else if (path.getName().contains("serDeFail")) {
+
+        // Simulate the way that a serDe exception
+        FSDataInputStream fis = fs.open(path);
+        SerializationUtils.deserialize(ByteStreams.toByteArray(fis));
+
+        // This line should never be reached since we generate SerDe Exception on purpose.
+        Assert.assertTrue(false);
+        return null;
+      }
+      else return initFlowSpec(Files.createTempDir().getAbsolutePath());
+    }
+  }
+
+
+  @Test
+  public void testGetSpecURI() throws Exception {
+    File specDir = Files.createTempDir();
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.SPECSTORE_FS_DIR_KEY, specDir.getAbsolutePath());
+    SpecSerDe serde = Mockito.mock(SpecSerDe.class);
+    FSSpecStore fsSpecStore = new FSSpecStore(ConfigUtils.propertiesToConfig(properties), serde);
+
+    URI specURI0 = URI.create("spec0");
+    URI specURI1 = URI.create("spec1");
+    URI specURI2 = URI.create("spec2");
+
+    File specFile1 = new File(specDir, "spec0");
+    Assert.assertTrue(specFile1.createNewFile());
+    File specFile2 = new File(specDir, "spec1");
+    Assert.assertTrue(specFile2.createNewFile());
+    File specFile3 = new File(specDir, "spec2");
+    Assert.assertTrue(specFile3.createNewFile());
+
+    fsSpecStore.exists(specURI0);
+    fsSpecStore.exists(specURI1);
+    fsSpecStore.exists(specURI2);
+
+    Iterator<URI> it = fsSpecStore.getSpecURIs();
+    int count = 0;
+    List<URI> result = new ArrayList<>();
+    while (it.hasNext()) {
+      count += 1 ;
+      result.add(it.next());
+    }
+
+    Assert.assertEquals(count, 3);
+    Assert.assertTrue(result.contains(specURI0));
+    Assert.assertTrue(result.contains(specURI1));
+    Assert.assertTrue(result.contains(specURI2));
+  }
+}
\ No newline at end of file
diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle
index ab56a3e..2701022 100644
--- a/gobblin-service/build.gradle
+++ b/gobblin-service/build.gradle
@@ -71,6 +71,9 @@ dependencies {
   compile externalDependency.zkClient
 
   testCompile project(":gobblin-example")
+
+  // Required for adding Test class into classpath
+  testCompile project(":gobblin-runtime").sourceSets.test.output
   testCompile project(path: ":gobblin-modules:gobblin-kafka-08:", configuration: "tests")
   testCompile project(path: ":gobblin-metastore", configuration: "testFixtures")
   testCompile project(":gobblin-test-utils")
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index e5dc41e..fde21bf 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,37 +17,26 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
-import java.net.URI;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-
+import java.net.URI;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.commons.lang.StringUtils;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.JobException;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.listeners.JobListener;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
@@ -62,6 +51,15 @@ import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -84,8 +82,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
   public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
-      SchedulerService schedulerService, Optional<Logger> log)
-      throws Exception {
+      SchedulerService schedulerService, Optional<Logger> log) throws Exception {
     super(ConfigUtils.configToProperties(config), schedulerService);
 
     _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
@@ -98,10 +95,9 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
   public GobblinServiceJobScheduler(String serviceName, Config config, Optional<HelixManager> helixManager,
       Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Optional<DagManager> dagManager,
-      SchedulerService schedulerService, Optional<Logger> log)
-      throws Exception {
-    this(serviceName, config, helixManager, flowCatalog, topologyCatalog, new Orchestrator(config, topologyCatalog, dagManager, log),
-        schedulerService, log);
+      SchedulerService schedulerService, Optional<Logger> log) throws Exception {
+    this(serviceName, config, helixManager, flowCatalog, topologyCatalog,
+        new Orchestrator(config, topologyCatalog, dagManager, log), schedulerService, log);
   }
 
   public synchronized void setActive(boolean isActive) {
@@ -114,17 +110,16 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     if (isActive) {
       // Need to set active=true first; otherwise in the onAddSpec(), node will forward specs to active node, which is itself.
       this.isActive = isActive;
+
       if (this.flowCatalog.isPresent()) {
-        Collection<Spec> specs = this.flowCatalog.get().getSpecsWithTimeUpdate();
-        for (Spec spec : specs) {
-          //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
-          if (spec instanceof FlowSpec) {
-            Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-            onAddSpec(modifiedSpec);
-          } else {
-            onAddSpec(spec);
+        // Load spec asynchronously and make scheduler be aware of that.
+        Thread scheduleSpec = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            scheduleSpecsFromCatalog();
           }
-        }
+        });
+        scheduleSpec.start();
       }
     } else {
       // Since we are going to change status to isActive=false, unschedule all flows
@@ -136,6 +131,45 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
   }
 
+  /**
+   * Load all {@link FlowSpec}s from {@link FlowCatalog} as one of the initialization step,
+   * and make schedulers be aware of that.
+   *
+   */
+  private void scheduleSpecsFromCatalog() {
+    Iterator<URI> specUris = null;
+    long startTime = System.currentTimeMillis();
+
+    try {
+      specUris = this.flowCatalog.get().getSpecURIs();
+    } catch (SpecSerDeException ssde) {
+      throw new RuntimeException("Failed to get the iterator of all Spec URIS", ssde);
+    }
+
+
+    try {
+      while (specUris.hasNext()) {
+        Spec spec = null;
+        try {
+          spec = this.flowCatalog.get().getSpec(specUris.next());
+        } catch (SpecNotFoundException snfe) {
+          _log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatlog"
+              + ", suspecting current modification on SpecStore", specUris.next()), snfe);
+        }
+
+        //Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change
+        if (spec instanceof FlowSpec) {
+          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+          onAddSpec(modifiedSpec);
+        } else {
+          onAddSpec(spec);
+        }
+      }
+    } finally {
+      this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
+    }
+  }
+
   @VisibleForTesting
   protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) {
     Properties properties = spec.getConfigAsProperties();
@@ -147,8 +181,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   }
 
   @Override
-  protected void startUp()
-      throws Exception {
+  protected void startUp() throws Exception {
     super.startUp();
   }
 
@@ -156,8 +189,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
    * Synchronize the job scheduling because the same flowSpec can be scheduled by different threads.
    */
   @Override
-  public synchronized void scheduleJob(Properties jobProps, JobListener jobListener)
-      throws JobException {
+  public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
     Map<String, Object> additionalJobDataMap = Maps.newHashMap();
     additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC,
         this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)));
@@ -170,8 +202,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   }
 
   @Override
-  public void runJob(Properties jobProps, JobListener jobListener)
-      throws JobException {
+  public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
     try {
       Spec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
       this.orchestrator.orchestrate(flowSpec);
@@ -194,7 +225,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
 
     if (addedSpec instanceof FlowSpec) {
       try {
-        FlowSpec flowSpec  = (FlowSpec) addedSpec;
+        FlowSpec flowSpec = (FlowSpec) addedSpec;
         Properties jobConfig = new Properties();
         Properties flowSpecProperties = ((FlowSpec) addedSpec).getConfigAsProperties();
         jobConfig.putAll(this.properties);
@@ -203,8 +234,8 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
             flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
         jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
             ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
-        if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils
-            .isNotBlank(flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
+        if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
+            flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
           jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
               flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
         }
@@ -244,7 +275,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         }
         return new AddSpecResponse(compiledFlow);
       } catch (JobException je) {
-        _log.error("{} Failed to schedule or run FlowSpec {}", serviceName,  addedSpec, je);
+        _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
       }
     }
     return null;
@@ -318,8 +349,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     private static final Logger _log = LoggerFactory.getLogger(GobblinServiceJob.class);
 
     @Override
-    public void executeImpl(JobExecutionContext context)
-        throws JobExecutionException {
+    public void executeImpl(JobExecutionContext context) throws JobExecutionException {
       _log.info("Starting FlowSpec " + context.getJobDetail().getKey());
 
       JobDataMap dataMap = context.getJobDetail().getJobDataMap();
@@ -335,8 +365,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
 
     @Override
-    public void interrupt()
-        throws UnableToInterruptJobException {
+    public void interrupt() throws UnableToInterruptJobException {
       log.info("Job was interrupted");
     }
   }
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
index a6e1bc5..e289cbd 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobSchedulerTest.java
@@ -16,17 +16,29 @@
  */
 package org.apache.gobblin.service.modules.scheduler;
 
-import java.net.URI;
-import java.util.Properties;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
+import com.google.common.base.Optional;
+import com.google.common.base.Predicate;
+import com.google.common.io.Files;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-
+import java.io.File;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
+import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.scheduler.SchedulerService;
+import org.apache.gobblin.service.modules.orchestration.Orchestrator;
+import org.apache.gobblin.runtime.spec_catalog.FlowCatalogTest;
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.gobblin.util.ConfigUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
 
 
 public class GobblinServiceJobSchedulerTest {
@@ -35,6 +47,51 @@ public class GobblinServiceJobSchedulerTest {
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
   private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template";
 
+  /**
+   * Test whenever JobScheduler is calling setActive, the FlowSpec is loading into scheduledFlowSpecs (eventually)
+   */
+  @Test
+  public void testJobSchedulerInit() throws Exception {
+    // Mock a FlowCatalog.
+    File specDir = Files.createTempDir();
+
+    Properties properties = new Properties();
+    properties.setProperty(ConfigurationKeys.FLOWSPEC_STORE_DIR_KEY, specDir.getAbsolutePath());
+    FlowCatalog flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(properties));
+    ServiceBasedAppLauncher serviceLauncher = new ServiceBasedAppLauncher(properties, "GaaSJobSchedulerTest");
+
+    serviceLauncher.addService(flowCatalog);
+    serviceLauncher.start();
+
+    FlowSpec flowSpec0 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec0"));
+    FlowSpec flowSpec1 = FlowCatalogTest.initFlowSpec(specDir.getAbsolutePath(), URI.create("spec1"));
+
+    flowCatalog.put(flowSpec0, false);
+    flowCatalog.put(flowSpec1, false);
+
+    Assert.assertEquals(flowCatalog.getSpecs().size(), 2);
+
+    // Mock a GaaS scheduler.
+    TestGobblinServiceJobScheduler scheduler = new TestGobblinServiceJobScheduler("testscheduler",
+        ConfigFactory.empty(), Optional.of(flowCatalog), null, null, null);
+
+    scheduler.setActive(true);
+
+    AssertWithBackoff.create().timeoutMs(6000).maxSleepMs(2000).backoffFactor(2)
+        .assertTrue(new Predicate<Void>() {
+          @Override
+          public boolean apply(Void input) {
+            Map<String, Spec> scheduledFlowSpecs = scheduler.scheduledFlowSpecs;
+            if (scheduledFlowSpecs != null && scheduledFlowSpecs.size() == 2) {
+              return scheduler.scheduledFlowSpecs.containsKey("spec0") &&
+                  scheduler.scheduledFlowSpecs.containsKey("spec1");
+            } else {
+              return false;
+            }
+          }
+        }, "Waiting all flowSpecs to be scheduled");
+  }
+
   @Test
   public void testDisableFlowRunImmediatelyOnStart()
       throws Exception {
@@ -56,4 +113,21 @@ public class GobblinServiceJobSchedulerTest {
     Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_GROUP_KEY), TEST_GROUP_NAME);
     Assert.assertEquals(modifiedConfig.getString(ConfigurationKeys.JOB_NAME_KEY), TEST_FLOW_NAME);
   }
+
+  class TestGobblinServiceJobScheduler extends GobblinServiceJobScheduler {
+    public TestGobblinServiceJobScheduler(String serviceName, Config config,
+        Optional<FlowCatalog> flowCatalog, Optional<TopologyCatalog> topologyCatalog, Orchestrator orchestrator,
+        SchedulerService schedulerService) throws Exception {
+      super(serviceName, config, Optional.absent(), flowCatalog, topologyCatalog, orchestrator, schedulerService, Optional.absent());
+    }
+
+    /**
+     * Override super method to only add spec into in-memory containers but not scheduling anything to simplify testing.
+     */
+    @Override
+    public AddSpecResponse onAddSpec(Spec addedSpec) {
+      super.scheduledFlowSpecs.put(addedSpec.getUri().toString(), addedSpec);
+      return new AddSpecResponse(addedSpec.getDescription());
+    }
+  }
 }
\ No newline at end of file