You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by wl...@apache.org on 2023/02/27 21:26:43 UTC

[gobblin] branch master updated: [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow (#3640)

This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new af2a9c196 [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow (#3640)
af2a9c196 is described below

commit af2a9c196d74b96d51dd8ee57b87cba0baceb9a5
Author: umustafi <um...@gmail.com>
AuthorDate: Mon Feb 27 13:26:35 2023 -0800

    [GOBBLIN-1783] Initialize scheduler with batch gets instead of individual get per flow (#3640)
    
    * wip
    
    * Implementation of batched get
    
    * add metrics to measure scheduler initialization and get spec time
    
    * implement methods for FSSpecStore
    
    * remove comment
    
    * refactor to consolidate pagination methods in one, fix tests
    
    * fix service tests
    
    * fix tests and specification for pagination to include nonneg value
    
    * Added test cases for edge cases of pagination, ensure consumer start before scheduler
    
    * Check for newest spec add to scheduler
    
    * Properly add new modified_time prop to Config using setProperty
    
    * handle modification time
    
    * fix tests
    
    * handle edge case, small fixes
    
    * avoid memory leak for last updated time
    
    ---------
    
    Co-authored-by: Urmi Mustafi <um...@umustafi-mn1.linkedin.biz>
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +
 .../service/FlowConfigResourceLocalHandler.java    |   2 +-
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |   3 +
 .../gobblin/runtime/api/FlowSpecSearchObject.java  |   2 +-
 .../gobblin/runtime/api/InstrumentedSpecStore.java |   6 +-
 .../org/apache/gobblin/runtime/api/SpecStore.java  |  12 +-
 .../gobblin/runtime/metrics/RuntimeMetrics.java    |   3 +
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  10 +-
 .../gobblin/runtime/spec_store/FSSpecStore.java    |  33 ++++-
 .../runtime/spec_store/MysqlBaseSpecStore.java     |  69 ++++++---
 .../gobblin/runtime/spec_store/MysqlSpecStore.java |  36 ++++-
 .../runtime/spec_store/MysqlSpecStoreTest.java     |  37 +++--
 .../spec_store/MysqlSpecStoreWithUpdateTest.java   |  21 +--
 .../modules/core/GobblinServiceManager.java        |  12 ++
 .../scheduler/GobblinServiceJobScheduler.java      | 165 ++++++++++++++++-----
 15 files changed, 318 insertions(+), 95 deletions(-)

diff --git a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 362016cdd..fe62d5eec 100644
--- a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -89,6 +89,8 @@ public class ConfigurationKeys {
   // Job retriggering
   public static final String JOB_RETRIGGERING_ENABLED = "job.retriggering.enabled";
   public static final String DEFAULT_JOB_RETRIGGERING_ENABLED = "true";
+  public static final String LOAD_SPEC_BATCH_SIZE = "load.spec.batch.size";
+  public static final int DEFAULT_LOAD_SPEC_BATCH_SIZE = 100;
 
   // Job executor thread pool size
   public static final String JOB_EXECUTOR_THREAD_POOL_SIZE_KEY = "jobexecutor.threadpool.size";
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index 5c8dc8c41..f97b1b915 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -116,7 +116,7 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
    * Get all flow configs in between start and start + count - 1
    */
   public Collection<FlowConfig> getAllFlowConfigs(int start, int count) {
-    return flowCatalog.getAllSpecs(start, count).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+    return flowCatalog.getSpecsPaginated(start, count).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
   }
 
   /**
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index 1e007c7c6..85e7a269b 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -65,6 +65,9 @@ import org.apache.gobblin.util.ConfigUtils;
 @SuppressFBWarnings(value="SE_BAD_FIELD",
     justification = "FindBugs complains about Config not being serializable, but the implementation of Config is serializable")
 public class FlowSpec implements Configurable, Spec {
+  // Key for Property associated with modified_time
+  public static final String MODIFICATION_TIME_KEY = "modified_time";
+
   private static final long serialVersionUID = -5511988862945107734L;
 
   /** An URI identifying the flow. */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
index dc5d6447e..226cd6cdf 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -121,7 +121,7 @@ public class FlowSpecSearchObject implements SpecSearchObject {
 
     if (this.getCount() > 0) {
       // Order by two fields to make a full order by
-      limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT ?");
+      limitAndOffset.add(" ORDER BY spec_uri ASC LIMIT ?");
       if (this.getStart() > 0) {
         limitAndOffset.add(" OFFSET ?");
       }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index d2dfd7eb8..d94547989 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -171,8 +171,8 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   }
 
   @Override
-  public Collection<Spec> getSpecs(int start, int count) throws IOException {
-    return this.getTimer.invokeMayThrowIO(() -> getSpecsImpl(start, count));
+  public Collection<Spec> getSpecsPaginated(int startOffset, int batchSize) throws IOException, IllegalArgumentException {
+    return this.getTimer.invokeMayThrowIO(() -> getSpecsPaginatedImpl(startOffset, batchSize));
   }
 
   @Override
@@ -193,7 +193,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
   public abstract Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException;
   public abstract int getSizeImpl() throws IOException;
-  public abstract Collection<Spec> getSpecsImpl(int start, int count) throws IOException;
+  public abstract Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize) throws IOException, IllegalArgumentException;
 
   /** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
   public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index 072ce2195..d2cc41e6c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -127,10 +127,14 @@ public interface SpecStore {
   Collection<Spec> getSpecs() throws IOException;
 
   /***
-   * Get all {@link Spec}s from the {@link SpecStore} with pagination input.
-   * @throws IOException Exception in retrieving {@link Spec}s.
-   */
-  Collection<Spec> getSpecs(int start, int count) throws IOException;
+   * Retrieve a batch of {@link Spec}s of at most size batchSize beginning at startOffset after creating a unique
+   * ordering of the specs based on primary key spec_uri.
+   * @param startOffset starting row to batch the specs returned from, startOffset >= 0
+   * @param batchSize max number of specs returned in the batch, batchSize >= 0
+   * @throws IOException
+   * @throws IllegalArgumentException in retrieving the {@link Spec} or if startOffset < 0, batchSize < 0
+   */
+  Collection<Spec> getSpecsPaginated(int startOffset, int batchSize) throws IOException, IllegalArgumentException;
 
   /**
    * Return an iterator of Spec URIs(Spec identifiers)
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
index 1b7bb3ae5..b24ef000d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/metrics/RuntimeMetrics.java
@@ -54,6 +54,9 @@ public class RuntimeMetrics {
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_QUOTA_REQUESTS_EXCEEDED = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.quotaRequests.exceeded";
   public static final String GOBBLIN_MYSQL_QUOTA_MANAGER_TIME_TO_CHECK_QUOTA = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + "gobblin.mysql.quota.manager.time.to.check.quota";
 
+  public static final String GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.getSpecSpeedDuringStartupAvgMillis";
+  public static final String GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.loadSpecBatchSize";
+  public static final String GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS = ServiceMetricNames.GOBBLIN_SERVICE_PREFIX + ".jobScheduler.timeToInitializeSchedulerMillis";
   // Metadata keys
   public static final String TOPIC = "topic";
   public static final String GROUP_ID = "groupId";
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index 2ab61e14d..19be25c51 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -317,18 +317,18 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   /**
-   * A function to get all specs in the {@link SpecStore} between the provided start index and (start + count - 1) index, inclusive.
+   * A function to get a batch of specs in the {@link SpecStore} between the provided start index and (start + count - 1) index, inclusive.
    * This enables pagination so getting SpecStore object will not timeout, and can be tuned to how many results is desired at any one time.
-   * The {@link Spec} in {@link SpecStore} are sorted in descending order of the modified_time while paginating.
+   * The {@link Spec} in {@link SpecStore} are sorted in ascending order of the spec_uri while paginating.
    *
    * @param start The start index.
    * @param count The total number of records to get.
    * @return A collection of specs between start and start + count - 1, inclusive.
    */
-  public Collection<Spec> getAllSpecs(int start, int count) {
+  public Collection<Spec> getSpecsPaginated(int start, int count) {
     try {
-      return specStore.getSpecs(start, count);
-    } catch (IOException e) {
+      return specStore.getSpecsPaginated(start, count);
+    } catch (IOException | IllegalArgumentException e) {
       throw new RuntimeException("Cannot retrieve specs from Spec stores between " + start + " and " + (start + count - 1), e);
     }
   }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 6f3bbe024..422888bcd 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -19,8 +19,10 @@ package org.apache.gobblin.runtime.spec_store;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -358,8 +360,35 @@ public class FSSpecStore extends InstrumentedSpecStore {
   }
 
   @Override
-  public Collection<Spec> getSpecsImpl(int start, int count) throws UnsupportedOperationException {
-    throw new UnsupportedOperationException();
+  public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize)
+      throws IOException {
+    if (startOffset < 0 || batchSize < 0) {
+      throw new IllegalArgumentException(String.format("Received negative offset or batch size value when they should be >= 0. "
+          + "Offset is %s and batch size is %s", startOffset, batchSize));
+    }
+    // Obtain sorted list of spec uris to paginate from
+    Iterator<URI> uriIterator = getSpecURIsImpl();
+    List<URI> sortedUris = new ArrayList<>();
+    while (uriIterator.hasNext()) {
+      sortedUris.add(uriIterator.next());
+    }
+    sortedUris.sort(URI::compareTo);
+
+    int numElements = 0;
+    List<Spec> batchOfSpecs = new ArrayList<>();
+    URI currentURI;
+
+    while (startOffset + numElements < sortedUris.size() && numElements < batchSize) {
+      currentURI = sortedUris.get(startOffset + numElements);
+      try {
+        batchOfSpecs.add(getSpecImpl(currentURI));
+      } catch (SpecNotFoundException e) {
+        log.warn("Unable to find spec for uri {} so proceeding to next URI. Stacktrace {}", currentURI, e);
+        continue;
+      }
+      numElements += 1;
+    }
+    return batchOfSpecs;
   }
 
   private int getSizeImpl(Path directory) throws IOException {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
index b12360466..713c2b9d8 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -28,7 +28,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
-import javax.sql.DataSource;
+import java.util.Properties;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
@@ -36,6 +36,7 @@ import com.google.common.io.ByteStreams;
 import com.typesafe.config.Config;
 import com.zaxxer.hikari.HikariDataSource;
 
+import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
@@ -81,9 +82,10 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   protected static final String UPDATE_STATEMENT = "UPDATE %s SET spec=?,spec_json=? WHERE spec_uri=? AND UNIX_TIMESTAMP(modified_time) < ?";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
   private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM %s WHERE ";
-  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
+  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, modified_time FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec, modified_time FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";
   private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
   // NOTE: using max length of a `FlowSpec` URI, as it's believed to be the longest of existing `Spec` types
   private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
@@ -103,6 +105,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
     public final String getAllStatement = String.format(getTablelessGetAllStatement(), MysqlBaseSpecStore.this.tableName);
     public final String getAllURIsStatement = String.format(getTablelessGetAllURIsStatement(), MysqlBaseSpecStore.this.tableName);
     public final String getAllURIsWithTagStatement = String.format(getTablelessGetAllURIsWithTagStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String getBatchStatement = String.format(getTablelessGetBatchStatement(), MysqlBaseSpecStore.this.tableName);
     public final String getSizeStatement = String.format(getTablelessGetSizeStatement(), MysqlBaseSpecStore.this.tableName);
     public final String createTableStatement = String.format(getTablelessCreateTableStatement(), MysqlBaseSpecStore.this.tableName);
 
@@ -119,6 +122,26 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
       return MysqlBaseSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()));
     }
 
+    public Spec extractSpecWithModificationTime(ResultSet rs) throws SQLException, IOException {
+      Spec spec = MysqlBaseSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()));
+      // Set modified timestamp in flowSpec properties list
+      if (spec instanceof FlowSpec) {
+        long timestamp = rs.getTimestamp(FlowSpec.MODIFICATION_TIME_KEY).getTime();
+        FlowSpec flowSpec = (FlowSpec) spec;
+        Properties properties = flowSpec.getConfigAsProperties();
+        properties.setProperty(FlowSpec.MODIFICATION_TIME_KEY, String.valueOf(timestamp));
+        return flowSpec;
+      }
+      return spec;
+    }
+
+    public void completeGetBatchStatement(PreparedStatement statement, int startOffset, int batchSize)
+        throws SQLException {
+      int i = 0;
+      statement.setInt(++i, batchSize);
+      statement.setInt(++i, startOffset);
+    }
+
     protected String getTablelessExistsStatement() { return MysqlBaseSpecStore.EXISTS_STATEMENT; }
     protected String getTablelessUpdateStatement() { return MysqlBaseSpecStore.UPDATE_STATEMENT; }
     protected String getTablelessInsertStatement() { return MysqlBaseSpecStore.INSERT_STATEMENT; }
@@ -127,6 +150,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
     protected String getTablelessGetAllStatement() { return MysqlBaseSpecStore.GET_ALL_STATEMENT; }
     protected String getTablelessGetAllURIsStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_STATEMENT; }
     protected String getTablelessGetAllURIsWithTagStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_WITH_TAG_STATEMENT; }
+    protected String getTablelessGetBatchStatement() {return MysqlBaseSpecStore.GET_SPECS_BATCH_STATEMENT; }
     protected String getTablelessGetSizeStatement() { return MysqlBaseSpecStore.GET_SIZE_STATEMENT; }
     protected String getTablelessCreateTableStatement() { return MysqlBaseSpecStore.CREATE_TABLE_STATEMENT; }
   }
@@ -221,7 +245,7 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
     Iterator<Spec> resultSpecs = withPreparedStatement(this.sqlStatements.getAllStatement + " WHERE spec_uri = ?", statement -> {
       statement.setString(1, specUri.toString());
-      return retrieveSpecs(statement).iterator();
+      return retrieveSpecsWithModificationTime(statement).iterator();
     });
     if (resultSpecs.hasNext()) {
       return resultSpecs.next();
@@ -260,10 +284,23 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
     return specs;
   }
 
+  protected final Collection<Spec> retrieveSpecsWithModificationTime(PreparedStatement statement) throws IOException {
+    List<Spec> specs = new ArrayList<>();
+    try (ResultSet rs = statement.executeQuery()) {
+      while (rs.next()) {
+        specs.add(this.sqlStatements.extractSpecWithModificationTime(rs));
+      }
+    } catch (SQLException | SpecSerDeException e) {
+      log.error("Failed to deserialize spec", e);
+      throw new IOException(e);
+    }
+    return specs;
+  }
+
   @Override
   public Iterator<URI> getSpecURIsImpl() throws IOException {
     return withPreparedStatement(this.sqlStatements.getAllURIsStatement, statement -> {
-      return retreiveURIs(statement).iterator();
+      return retrieveURIs(statement).iterator();
     });
   }
 
@@ -278,20 +315,14 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   }
 
   @Override
-  public Collection<Spec> getSpecsImpl(int start, int count) throws IOException {
-    List<String> limitAndOffset = new ArrayList<>();
-    if (count > 0) {
-      // Order by two fields to make a full order by
-      limitAndOffset.add(" ORDER BY modified_time DESC, spec_uri ASC LIMIT");
-      limitAndOffset.add(String.valueOf(count));
-      if (start > 0) {
-        limitAndOffset.add("OFFSET");
-        limitAndOffset.add(String.valueOf(start));
-      }
+  public Collection<Spec> getSpecsPaginatedImpl(int startOffset, int batchSize) throws IOException, IllegalArgumentException {
+    if (startOffset < 0 || batchSize < 0) {
+      throw new IllegalArgumentException(String.format("Received negative offset or batch size value when they should be >= 0. "
+          + "Offset is %s and batch size is %s", startOffset, batchSize));
     }
-    String finalizedStatement = this.sqlStatements.getAllStatement + String.join(" ", limitAndOffset);
-    return withPreparedStatement(finalizedStatement, statement -> {
-      return retrieveSpecs(statement);
+    return withPreparedStatement(this.sqlStatements.getBatchStatement, statement -> {
+      this.sqlStatements.completeGetBatchStatement(statement, startOffset, batchSize);
+      return retrieveSpecsWithModificationTime(statement);
     });
   }
 
@@ -299,11 +330,11 @@ public class MysqlBaseSpecStore extends InstrumentedSpecStore {
   public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
     return withPreparedStatement(this.sqlStatements.getAllURIsWithTagStatement, statement -> {
       statement.setString(1, tag);
-      return retreiveURIs(statement).iterator();
+      return retrieveURIs(statement).iterator();
     });
   }
 
-  private List<URI> retreiveURIs(PreparedStatement statement) throws SQLException {
+  private List<URI> retrieveURIs(PreparedStatement statement) throws SQLException {
     List<URI> uris = new ArrayList<>();
 
     try (ResultSet rs = statement.executeQuery()) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 00e203da1..3e9cd4351 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -17,10 +17,6 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
-import com.google.common.base.Charsets;
-import com.google.common.io.ByteStreams;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
@@ -28,7 +24,15 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.Collection;
+import java.util.Properties;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
@@ -38,7 +42,8 @@ import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
-import static org.apache.gobblin.service.ServiceConfigKeys.*;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
 
 
 /**
@@ -60,7 +65,8 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
       + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, spec_json) "
       + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), spec_json = VALUES(spec_json)";
   private static final String SPECIFIC_GET_STATEMENT_BASE = "SELECT spec_uri, spec, spec_json FROM %s WHERE ";
-  private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, spec, spec_json FROM %s";
+  private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, spec, spec_json, modified_time FROM %s";
+  private static final String SPECIFIC_GET_SPECS_BATCH_STATEMENT = "SELECT spec_uri, spec, spec_json, modified_time FROM %s ORDER BY spec_uri ASC LIMIT ? OFFSET ?";
   private static final String SPECIFIC_CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR("
       + FlowSpec.Utils.maxFlowSpecUriLength()
       + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
@@ -110,6 +116,22 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
           : MysqlSpecStore.this.specSerDe.deserialize(rs.getString(3).getBytes(Charsets.UTF_8));
     }
 
+    @Override
+    public Spec extractSpecWithModificationTime(ResultSet rs) throws SQLException, IOException {
+      Spec spec = rs.getString(3) == null
+          ? MysqlSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
+          : MysqlSpecStore.this.specSerDe.deserialize(rs.getString(3).getBytes(Charsets.UTF_8));
+      // Set modified timestamp in flowSpec properties list
+      if (spec instanceof FlowSpec) {
+        long timestamp = rs.getTimestamp(FlowSpec.MODIFICATION_TIME_KEY).getTime();
+        FlowSpec flowSpec = (FlowSpec) spec;
+        Properties properties = flowSpec.getConfigAsProperties();
+        properties.setProperty(FlowSpec.MODIFICATION_TIME_KEY, String.valueOf(timestamp));
+        return flowSpec;
+      }
+      return spec;
+    }
+
     @Override
     protected String getTablelessInsertStatement() { return MysqlSpecStore.SPECIFIC_INSERT_STATEMENT; }
     @Override
@@ -117,6 +139,8 @@ public class MysqlSpecStore extends MysqlBaseSpecStore {
     @Override
     protected String getTablelessGetAllStatement() { return MysqlSpecStore.SPECIFIC_GET_ALL_STATEMENT; }
     @Override
+    protected String getTablelessGetBatchStatement() { return MysqlSpecStore.SPECIFIC_GET_SPECS_BATCH_STATEMENT; }
+    @Override
     protected String getTablelessCreateTableStatement() { return MysqlSpecStore.SPECIFIC_CREATE_TABLE_STATEMENT; }
   }
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 90659d333..73b2e052a 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -153,6 +153,7 @@ public class MysqlSpecStoreTest {
   @Test (dependsOnMethods = "testAddSpec")
   public void testGetSpec() throws Exception {
     FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+    removeModificationTimestampFromSpecs(result);
     Assert.assertEquals(result, this.flowSpec1);
 
     Collection<Spec> specs = this.specStore.getSpecs();
@@ -283,13 +284,24 @@ public class MysqlSpecStoreTest {
     Assert.assertFalse(specs.contains(this.flowSpec4));
   }
 
+  /**
+   * This method is used for testing to remove the `modification_timestamp` key from the FlowSpec config field for
+   * only for testing purposes because the equality check between FlowSpec created for testing and retrieved from the
+   * store will not be equal. In practice, we would never encounter this issue as we only compare specs obtained from
+   * the store with the key mentioned.
+   */
+  public static void removeModificationTimestampFromSpecs(Spec spec) {
+    ((FlowSpec) spec).getConfigAsProperties().remove(FlowSpec.MODIFICATION_TIME_KEY);
+  }
+
   @Test (dependsOnMethods =  "testGetSpec")
   public void testGetAllSpecPaginate() throws Exception {
     /**
      * Sorted order of the specStore configurations is flowSpec1, flowSpec2, flowSpec4
      */
     // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so return all 3 flowSpecs
-    Collection<Spec> specs = this.specStore.getSpecs(0,10);
+    Collection<Spec> specs = this.specStore.getSpecsPaginated(0,10);
+    specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
     Assert.assertEquals(specs.size(), 3);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
@@ -297,25 +309,31 @@ public class MysqlSpecStoreTest {
 
     // Return all flowSpecs using the default get all specs function. Testing default functionality of returning everything
     specs = this.specStore.getSpecs();
+    specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
     Assert.assertEquals(specs.size(), 3);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only return first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
+    specs.forEach(spec -> removeModificationTimestampFromSpecs(spec));
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs from index 0 to 2 - 1. Total of 3 flowSpecs, only return first two.
-    // Check that functionality for not including a start value is the same as including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
-    Assert.assertEquals(specs.size(), 2);
-    Assert.assertTrue(specs.contains(this.flowSpec1));
-    Assert.assertTrue(specs.contains(this.flowSpec2));
-    Assert.assertFalse(specs.contains(this.flowSpec4));
+    // Return 0 flowSpecs when batch size is 0.
+    specs = this.specStore.getSpecsPaginated(2,0);
+    Assert.assertEquals(specs.size(), 0);
+
+    // Return 0 flowSpecs when start offset is past the end
+    specs = this.specStore.getSpecsPaginated(3,1);
+    Assert.assertEquals(specs.size(), 0);
+
+    // Check that we throw an error for incorrect inputs
+    Assert.assertThrows(IllegalArgumentException.class, () -> this.specStore.getSpecsPaginated(-1, 2));
+    Assert.assertThrows(IllegalArgumentException.class, () -> this.specStore.getSpecsPaginated(2, -4));
   }
 
   @Test (expectedExceptions = {IOException.class})
@@ -336,6 +354,7 @@ public class MysqlSpecStoreTest {
     this.oldSpecStore.addSpec(this.flowSpec1);
 
     FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+    removeModificationTimestampFromSpecs(spec);
     Assert.assertEquals(spec, this.flowSpec1);
   }
 
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
index 621fa3771..bcd77eaab 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreWithUpdateTest.java
@@ -164,6 +164,7 @@ public class MysqlSpecStoreWithUpdateTest {
   @Test (dependsOnMethods = "testAddSpec")
   public void testGetSpec() throws Exception {
     FlowSpec result = (FlowSpec) this.specStore.getSpec(this.uri1);
+    MysqlSpecStoreTest.removeModificationTimestampFromSpecs(result);
     Assert.assertEquals(result, this.flowSpec1);
 
     Collection<Spec> specs = this.specStore.getSpecs();
@@ -298,7 +299,9 @@ public class MysqlSpecStoreWithUpdateTest {
   public void testUpdate() throws Exception{
     long version = System.currentTimeMillis() /1000;
     this.specStore.updateSpec(this.flowSpec4_update);
-    Assert.assertEquals(((FlowSpec) this.specStore.getSpec(this.uri4)), flowSpec4_update);
+    FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri4);
+    MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec);
+    Assert.assertEquals(spec, flowSpec4_update);
     Assert.expectThrows(IOException.class, () -> this.specStore.updateSpec(flowSpec4, version));
   }
 
@@ -308,7 +311,8 @@ public class MysqlSpecStoreWithUpdateTest {
      * Sorted order of the specStore configurations is flowSpec1, flowSpec2, flowSpec4
      */
     // Return all flowSpecs from index 0 to 9. Total of 3 flowSpecs only so return all 3 flowSpecs
-    Collection<Spec> specs = this.specStore.getSpecs(0,10);
+    Collection<Spec> specs = this.specStore.getSpecsPaginated(0,10);
+    specs.forEach(spec -> MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec));
     for (Spec spec: specs) {
       System.out.println("test" + spec.getUri());
     }
@@ -325,19 +329,15 @@ public class MysqlSpecStoreWithUpdateTest {
     Assert.assertTrue(specs.contains(this.flowSpec4));
 
     // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
-    specs = this.specStore.getSpecs(0,2);
+    specs = this.specStore.getSpecsPaginated(0,2);
+    specs.forEach(spec -> MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec));
     Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
     Assert.assertFalse(specs.contains(this.flowSpec4));
 
-    // Return all flowSpecs of index [0, 2). Total of 3 flowSpecs, only return first two.
-    // Check that functionality for not including a start value is the same as including start value of 0
-    specs = this.specStore.getSpecs(-1, 2);
-    Assert.assertEquals(specs.size(), 2);
-    Assert.assertTrue(specs.contains(this.flowSpec1));
-    Assert.assertTrue(specs.contains(this.flowSpec2));
-    Assert.assertFalse(specs.contains(this.flowSpec4));
+    // Check that we throw an error for incorrect inputs
+    Assert.assertThrows(IllegalArgumentException.class, () -> this.specStore.getSpecsPaginated(-1, -4));
   }
 
   @Test (expectedExceptions = {IOException.class})
@@ -358,6 +358,7 @@ public class MysqlSpecStoreWithUpdateTest {
     this.oldSpecStore.addSpec(this.flowSpec1);
 
     FlowSpec spec = (FlowSpec) this.specStore.getSpec(this.uri1);
+    MysqlSpecStoreTest.removeModificationTimestampFromSpecs(spec);
     Assert.assertEquals(spec, this.flowSpec1);
   }
 
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
index 8e113909e..b8513f5a8 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java
@@ -439,6 +439,18 @@ public class GobblinServiceManager implements ApplicationLauncher, StandardMetri
     this.eventBus.register(this);
     this.serviceLauncher.start();
 
+    // Wait until spec consumer service is running to set scheduler to active
+    if (this.configuration.isWarmStandbyEnabled()) {
+      while (!this.specStoreChangeMonitor.isRunning()) {
+        try {
+          LOGGER.info("Waiting for SpecStoreChangeMonitor to be started...");
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOGGER.warn("Interrupted while waiting for SpecStoreChangeMonitor to be started");
+        }
+      }
+    }
+
     if (this.helixManager.isPresent()) {
       // Subscribe to leadership changes
       this.helixManager.get().addControllerListener((ControllerChangeListener) this::handleLeadershipChange);
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
index 9c365e54f..54b9a6a26 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/scheduler/GobblinServiceJobScheduler.java
@@ -17,25 +17,43 @@
 
 package org.apache.gobblin.service.modules.scheduler;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.helix.HelixManager;
+import org.quartz.DisallowConcurrentExecution;
+import org.quartz.InterruptableJob;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.quartz.SchedulerException;
+import org.quartz.UnableToInterruptJobException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.MetricFilter;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
 import javax.inject.Inject;
 import javax.inject.Named;
 import javax.inject.Singleton;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang.StringUtils;
+
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.ContextAwareGauge;
 import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
@@ -44,9 +62,11 @@ import org.apache.gobblin.runtime.api.FlowSpec;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.listeners.JobListener;
+import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
 import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
+import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.scheduler.BaseGobblinJob;
 import org.apache.gobblin.scheduler.JobScheduler;
 import org.apache.gobblin.scheduler.SchedulerService;
@@ -56,20 +76,9 @@ import org.apache.gobblin.service.modules.orchestration.DagManager;
 import org.apache.gobblin.service.modules.orchestration.Orchestrator;
 import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
 import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
-import org.apache.gobblin.runtime.util.InjectionNames;
 import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
-import org.apache.helix.HelixManager;
-import org.quartz.DisallowConcurrentExecution;
-import org.quartz.InterruptableJob;
-import org.quartz.JobDataMap;
-import org.quartz.JobExecutionContext;
-import org.quartz.JobExecutionException;
-import org.quartz.SchedulerException;
-import org.quartz.UnableToInterruptJobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
 
@@ -97,12 +106,20 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
   @Getter
   protected final Map<String, Spec> scheduledFlowSpecs;
   @Getter
+  protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
+  protected volatile int loadSpecsBatchSize = -1;
+  @Getter
   private volatile boolean isActive;
   private String serviceName;
+  private volatile Long averageGetSpecTimeValue = -1L;
+  private volatile Long timeToInitializeSchedulerValue = -1L;
+  private final ContextAwareGauge averageGetSpecTimeMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS, () -> this.averageGetSpecTimeValue);;
+  private final ContextAwareGauge batchSize = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE, () -> this.loadSpecsBatchSize);
+  private final ContextAwareGauge timeToInitalizeSchedulerMillis = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_MILLIS, () -> this.timeToInitializeSchedulerValue);
   private static final MetricContext metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
       GobblinServiceJobScheduler.class);
   private static final ContextAwareMeter scheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
-  private static final ContextAwareMeter nonScheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);;
+  private static final ContextAwareMeter nonScheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);
 
   /**
    * If current instances is nominated as a handler for DR traffic from down GaaS-Instance.
@@ -133,10 +150,20 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     this.helixManager = helixManager;
     this.orchestrator = orchestrator;
     this.scheduledFlowSpecs = Maps.newHashMap();
+    this.lastUpdatedTimeForFlowSpec = Maps.newHashMap();
+    this.loadSpecsBatchSize = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE, String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
     this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
         && config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
     this.warmStandbyEnabled = warmStandbyEnabled;
     this.quotaManager = quotaManager;
+    // Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
+    // of the scheduler. If one metric exists, then the others should as well.
+    MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_AVERAGE_GET_SPEC_SPEED_WHILE_LOADING_ALL_SPECS_MILLIS);
+    if (metricContext.getGauges(filter).isEmpty()) {
+      metricContext.register(this.averageGetSpecTimeMillis);
+      metricContext.register(this.batchSize);
+      metricContext.register(timeToInitalizeSchedulerMillis);
+    }
   }
 
   public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
@@ -189,6 +216,19 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     }
   }
 
+  /** Helps modify spec before adding to scheduler for adhoc flows */
+  private void addSpecHelperMethod(Spec spec) {
+    // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
+    if (spec instanceof FlowSpec && PropertiesUtils
+        .getPropAsBoolean(((FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
+            "false")) {
+      Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
+      onAddSpec(modifiedSpec);
+    } else {
+      onAddSpec(spec);
+    }
+  }
+
   /**
    * Load all {@link FlowSpec}s from {@link FlowCatalog} as one of the initialization step,
    * and make schedulers be aware of that.
@@ -196,12 +236,20 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
    * If it is newly brought up as the DR handler, will load additional FlowSpecs and handle transition properly.
    */
   private void scheduleSpecsFromCatalog() {
-    Iterator<URI> specUris = null;
+    int numSpecs = this.flowCatalog.get().getSize();
     long startTime = System.currentTimeMillis();
-
+    Iterator<URI> uriIterator;
+    HashSet<URI> urisLeftToSchedule = new HashSet<>();
     try {
-      specUris = this.flowCatalog.get().getSpecURIs();
+      uriIterator = this.flowCatalog.get().getSpecURIs();
+      while (uriIterator.hasNext()) {
+        urisLeftToSchedule.add(uriIterator.next());
+      }
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
 
+    try {
       // If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
       if (isNominatedDRHandler) {
         // Synchronously cleaning the execution state for DR-applicable FlowSpecs
@@ -210,26 +258,51 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
         clearRunningFlowState(drUris);
       }
     } catch (IOException e) {
-      throw new RuntimeException("Failed to get the iterator of all Spec URIS", e);
+      throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
     }
 
-    while (specUris.hasNext()) {
-      Spec spec = this.flowCatalog.get().getSpecWrapper(specUris.next());
-      try {
-        // Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
-        if (spec instanceof FlowSpec && PropertiesUtils.getPropAsBoolean((
-            (FlowSpec) spec).getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
-          Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
-          onAddSpec(modifiedSpec);
-        } else {
-          onAddSpec(spec);
+    int startOffset = 0;
+    long batchGetStartTime;
+    long batchGetEndTime;
+
+    while (startOffset < numSpecs) {
+      batchGetStartTime  = System.currentTimeMillis();
+      Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
+      Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
+      batchGetEndTime = System.currentTimeMillis();
+
+      while (batchOfSpecsIterator.hasNext()) {
+        Spec spec = batchOfSpecsIterator.next();
+        try {
+          addSpecHelperMethod(spec);
+          urisLeftToSchedule.remove(spec.getUri());
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it and continue adding flows
+          _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
         }
-      } catch (Exception e) {
-        // If there is an uncaught error thrown during compilation, log it and continue adding flows
-        _log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
       }
+      startOffset += this.loadSpecsBatchSize;
+      // This count is used to ensure the average spec get time is calculated accurately for the last batch which may be
+      // smaller than the loadSpecsBatchSize
+      averageGetSpecTimeValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
     }
+
+    // Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
+    Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
+    while (urisLeft.hasNext()) {
+        URI uri = urisLeft.next();
+        try {
+          Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
+          addSpecHelperMethod(spec);
+        } catch (Exception e) {
+          // If there is an uncaught error thrown during compilation, log it and continue adding flows
+          _log.error("Could not schedule spec uri {} from flowCatalog due to ", uri, e);
+        }
+
+    }
+
     this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
+    this.timeToInitializeSchedulerValue = System.currentTimeMillis() - startTime;
   }
 
   /**
@@ -343,8 +416,27 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       }
     }
 
+    // Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
+    // don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
+    Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
+    String uriString = flowSpec.getUri().toString();
+    Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
+    // If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
+    // enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
+    if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
+        && this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
+      // For run-immediately flows with a schedule the modified_time would remain the same
+      if (this.lastUpdatedTimeForFlowSpec.get(uriString).compareTo(modificationTime) > 0
+          || (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
+        _log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
+            addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
+        return new AddSpecResponse(response);
+      }
+    }
+
     // todo : we should probably not schedule a flow if it is a runOnce flow
     this.scheduledFlowSpecs.put(flowSpecUri.toString(), addedSpec);
+    this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);
 
     if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
       _log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
@@ -353,6 +445,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
       } catch (JobException je) {
         _log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
         this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
+        this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
         return null;
       }
       if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
@@ -378,6 +471,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
     if (this.scheduledFlowSpecs.containsKey(specURI.toString())) {
       _log.info("Unscheduling flowSpec " + specURI + "/" + specVersion);
       this.scheduledFlowSpecs.remove(specURI.toString());
+      this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
       unscheduleJob(specURI.toString());
     } else {
       throw new JobException(String.format(
@@ -523,6 +617,7 @@ public class GobblinServiceJobScheduler extends JobScheduler implements SpecCata
           }
           GobblinServiceJobScheduler.this.flowCatalog.get().remove(specUri, new Properties(), false);
           GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
+          GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
         }
       } catch (JobException je) {
         _log.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);