You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by zi...@apache.org on 2021/10/25 20:26:19 UTC

[gobblin] branch master updated: Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)

This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 255bdc1  Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)
255bdc1 is described below

commit 255bdc15f4e8dd7dbf5fb94e9742d8430f2506f3
Author: Kip Kohn <ck...@linkedin.com>
AuthorDate: Mon Oct 25 13:26:11 2021 -0700

    Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s (#3414)
    
    * Refactor `MysqlSpecStore` into a generalization, `MysqlNonFlowSpecStore` (not limited to `FlowSpec`s), also useable for `TopologySpec`s
    
    * Add missing file, `MysqlNonFlowSpecStoreTest`
    
    * Fixup `MysqlNonFlowSpecStoreTest`
    
    * Simplify implementaiton of `MysqlSpecStore.getSpecsImpl`.
    
    * Rename `MysqlNonFlowSpecStore` to `MysqlBaseFlowSpecStore`.
    
    * Aid maintainers with additional code comments
---
 .../gobblin/runtime/api/FlowSpecSearchObject.java  | 141 ++++++
 .../gobblin/runtime/api/InstrumentedSpecStore.java |   8 +
 .../gobblin/runtime/api/SpecSearchObject.java      |  11 +
 .../gobblin/runtime/spec_store/FSSpecStore.java    |   2 +-
 .../runtime/spec_store/MysqlBaseSpecStore.java     | 318 ++++++++++++++
 .../gobblin/runtime/spec_store/MysqlSpecStore.java | 473 ++++-----------------
 .../runtime/spec_store/MysqlBaseSpecStoreTest.java | 159 +++++++
 .../runtime/spec_store/MysqlSpecStoreTest.java     |   8 +-
 8 files changed, 728 insertions(+), 392 deletions(-)

diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
index ef6737c..644d5f4 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -17,12 +17,21 @@
 
 package org.apache.gobblin.runtime.api;
 
+import java.io.IOException;
 import java.net.URI;
 
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Splitter;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Getter;
 import lombok.ToString;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.service.FlowId;
 
@@ -34,6 +43,7 @@ import org.apache.gobblin.service.FlowId;
 @Builder
 @ToString
 @AllArgsConstructor
+@Slf4j
 public class FlowSpecSearchObject implements SpecSearchObject {
   private final URI flowSpecUri;
   private final String flowGroup;
@@ -51,4 +61,135 @@ public class FlowSpecSearchObject implements SpecSearchObject {
   public static FlowSpecSearchObject fromFlowId(FlowId flowId) {
     return FlowSpecSearchObject.builder().flowGroup(flowId.getFlowGroup()).flowName(flowId.getFlowName()).build();
   }
+
+  /** This expects at least one parameter of `this` to be not null */
+  @Override
+  public String augmentBaseGetStatement(String baseStatement)
+      throws IOException {
+    List<String> conditions = new ArrayList<>();
+
+    /*
+     * IMPORTANT: the order of `conditions` added must align with the order of parameter binding later in `completePreparedStatement`!
+     */
+
+    if (this.getFlowSpecUri() != null) {
+      conditions.add("spec_uri = ?");
+    }
+
+    if (this.getFlowGroup() != null) {
+      conditions.add("flow_group = ?");
+    }
+
+    if (this.getFlowName() != null) {
+      conditions.add("flow_name = ?");
+    }
+
+    if (this.getTemplateURI() != null) {
+      conditions.add("template_uri = ?");
+    }
+
+    if (this.getUserToProxy() != null) {
+      conditions.add("user_to_proxy = ?");
+    }
+
+    if (this.getSourceIdentifier() != null) {
+      conditions.add("source_identifier = ?");
+    }
+
+    if (this.getDestinationIdentifier() != null) {
+      conditions.add("destination_identifier = ?");
+    }
+
+    if (this.getSchedule() != null) {
+      conditions.add("schedule = ?");
+    }
+
+    if (this.getModifiedTimestamp() != null) {
+      conditions.add("modified_time = ?");
+    }
+
+    if (this.getIsRunImmediately() != null) {
+      conditions.add("isRunImmediately = ?");
+    }
+
+    if (this.getOwningGroup() != null) {
+      conditions.add("owning_group = ?");
+    }
+
+    // If the propertyFilter is myKey=myValue, it looks for a config where key is `myKey` and value contains string `myValue`.
+    // If the propertyFilter string does not have `=`, it considers the string as a key and just looks for its existence.
+    // Multiple occurrences of `=` in  propertyFilter are not supported and ignored completely.
+    if (this.getPropertyFilter() != null) {
+      String propertyFilter = this.getPropertyFilter();
+      Splitter commaSplitter = Splitter.on(",").trimResults().omitEmptyStrings();
+      for (String property : commaSplitter.splitToList(propertyFilter)) {
+        if (property.contains("=")) {
+          String[] keyValue = property.split("=");
+          if (keyValue.length != 2) {
+            log.error("Incorrect flow config search query");
+            continue;
+          }
+          conditions.add("spec_json->'$.configAsProperties.\"" + keyValue[0] + "\"' like " + "'%" + keyValue[1] + "%'");
+        } else {
+          conditions.add("spec_json->'$.configAsProperties.\"" + property + "\"' is not null");
+        }
+      }
+    }
+
+    if (conditions.size() == 0) {
+      throw new IOException("At least one condition is required to query flow configs.");
+    }
+
+    return baseStatement + String.join(" AND ", conditions);
+  }
+
+  @Override
+  public void completePreparedStatement(PreparedStatement statement)
+      throws SQLException {
+    int i = 0;
+
+    /*
+     * IMPORTANT: the order of binding params must align with the order of building the conditions earlier in `augmentBaseGetStatement`!
+     */
+
+    if (this.getFlowSpecUri() != null) {
+      statement.setString(++i, this.getFlowSpecUri().toString());
+    }
+
+    if (this.getFlowGroup() != null) {
+      statement.setString(++i, this.getFlowGroup());
+    }
+
+    if (this.getFlowName() != null) {
+      statement.setString(++i, this.getFlowName());
+    }
+
+    if (this.getTemplateURI() != null) {
+      statement.setString(++i, this.getTemplateURI());
+    }
+
+    if (this.getUserToProxy() != null) {
+      statement.setString(++i, this.getUserToProxy());
+    }
+
+    if (this.getSourceIdentifier() != null) {
+      statement.setString(++i, this.getSourceIdentifier());
+    }
+
+    if (this.getDestinationIdentifier() != null) {
+      statement.setString(++i, this.getDestinationIdentifier());
+    }
+
+    if (this.getSchedule() != null) {
+      statement.setString(++i, this.getModifiedTimestamp());
+    }
+
+    if (this.getIsRunImmediately() != null) {
+      statement.setBoolean(++i, this.getIsRunImmediately());
+    }
+
+    if (this.getOwningGroup() != null) {
+      statement.setString(++i, this.getOwningGroup());
+    }
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 901ef88..1f2be84 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -91,6 +91,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   private OptionallyTimingInvoker getAllTimer;
   private OptionallyTimingInvoker getSizeTimer;
   private OptionallyTimingInvoker getURIsTimer;
+  private OptionallyTimingInvoker getURIsWithTagTimer;
   private MetricContext metricContext;
   private final boolean instrumentationEnabled;
 
@@ -105,6 +106,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
     this.getAllTimer = createTimingInvoker("-GETALL");
     this.getSizeTimer = createTimingInvoker("-GETCOUNT");
     this.getURIsTimer = createTimingInvoker("-GETURIS");
+    this.getURIsWithTagTimer = createTimingInvoker("-GETURISWITHTAG");
   }
 
   private OptionallyTimingInvoker createTimingInvoker(String suffix) {
@@ -159,6 +161,11 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   }
 
   @Override
+  public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+    return this.getURIsWithTagTimer.invokeMayThrowIO(() -> getSpecURIsWithTagImpl(tag));
+  }
+
+  @Override
   public int getSize() throws IOException {
     return this.getSizeTimer.invokeMayThrowIO(() -> getSizeImpl());
   }
@@ -170,6 +177,7 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
   public abstract Collection<Spec> getSpecsImpl() throws IOException;
   public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+  public abstract Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException;
   public abstract int getSizeImpl() throws IOException;
 
   /** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
index febf4c9..8d756dc 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
@@ -17,8 +17,19 @@
 
 package org.apache.gobblin.runtime.api;
 
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+
 /**
  * This is an interface to package all the parameters that should be used to search {@link Spec} in a {@link SpecStore}
  */
 public interface SpecSearchObject {
+
+  /** @returns `baseStatement`, further constrained by the search object's restrictions (e.g. through (unbound) `WHERE` clause conditions) */
+  String augmentBaseGetStatement(String baseStatement) throws IOException;
+
+  /** Bind all placeholders in `statement`, which must have been prepared from the result of {@link this.augmentBaseGetStatment()} */
+  public void completePreparedStatement(PreparedStatement statement) throws SQLException;
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
index 63e829a..921138f 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java
@@ -268,7 +268,7 @@ public class FSSpecStore extends InstrumentedSpecStore {
   }
 
   @Override
-  public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
+  public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
     throw new UnsupportedOperationException("Loading specs with tag is not supported in FS-Implementation of SpecStore");
   }
 
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
new file mode 100644
index 0000000..c328a2f
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStore.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteStreams;
+import com.typesafe.config.Config;
+
+import javax.sql.DataSource;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.MysqlDataSourceFactory;
+import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSerDe;
+import org.apache.gobblin.runtime.api.SpecSerDeException;
+import org.apache.gobblin.runtime.api.SpecStore;
+
+
+/**
+ * Implementation of {@link SpecStore} that stores specs in MySQL as a serialized BLOB, per the provided {@link SpecSerDe}.
+ * Note: versions are unsupported, so the version parameter is ignored in methods that have it.
+ *
+ * A tag column is added into implementation to serve certain filtering purposes in MySQL-based SpecStore.
+ * For example, in DR mode of GaaS, we would only want certain {@link Spec}s to be eligible for orchestrated
+ * by alternative GaaS instances. Another example is allow-listing/deny-listing {@link Spec}s temporarily
+ * but not removing it from {@link SpecStore}.
+ *
+ * The {@link MysqlSpecStore} is a specialization enhanced for {@link FlowSpec} search and retrieval.
+ */
+@Slf4j
+public class MysqlBaseSpecStore extends InstrumentedSpecStore {
+
+  /** `j.u.Function` variant for an operation that may @throw IOException or SQLException: preserves method signature checked exceptions */
+  @FunctionalInterface
+  protected interface CheckedFunction<T, R> {
+    R apply(T t) throws IOException, SQLException;
+  }
+
+  public static final String CONFIG_PREFIX = "mysqlBaseSpecStore";
+  public static final String DEFAULT_TAG_VALUE = "";
+
+  private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
+  protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, tag, spec) "
+      + "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec)";
+  private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
+  private static final String GET_STATEMENT_BASE = "SELECT spec_uri, spec FROM %s WHERE ";
+  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec FROM %s";
+  private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
+  private static final String GET_ALL_URIS_WITH_TAG_STATEMENT = "SELECT spec_uri FROM %s WHERE tag = ?";
+  private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
+  // NOTE: using max length of a `FlowSpec` URI, as it's believed to be the longest of existing `Spec` types
+  private static final String CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
+        + ") NOT NULL, tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
+        + "CURRENT_TIMESTAMP, spec LONGBLOB, PRIMARY KEY (spec_uri))";
+
+  /**
+   * The main point of difference with base classes is the DB schema and hence certain of the DML and queries.  Given the interrelation
+   * between statements, collect them within this inner class that enables selective, per-statement override, and delivers them as a unit.
+   */
+  protected class SqlStatements {
+    public final String existsStatement = String.format(getTablelessExistsStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String insertStatement = String.format(getTablelessInsertStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String deleteStatement = String.format(getTablelessDeleteStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String getStatementBase = String.format(getTablelessGetStatementBase(), MysqlBaseSpecStore.this.tableName);
+    public final String getAllStatement = String.format(getTablelessGetAllStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String getAllURIsStatement = String.format(getTablelessGetAllURIsStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String getAllURIsWithTagStatement = String.format(getTablelessGetAllURIsWithTagStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String getSizeStatement = String.format(getTablelessGetSizeStatement(), MysqlBaseSpecStore.this.tableName);
+    public final String createTableStatement = String.format(getTablelessCreateTableStatement(), MysqlBaseSpecStore.this.tableName);
+
+    public void completeInsertPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+      URI specUri = spec.getUri();
+
+      int i = 0;
+      statement.setString(++i, specUri.toString());
+      statement.setString(++i, tagValue);
+      statement.setBlob(++i, new ByteArrayInputStream(MysqlBaseSpecStore.this.specSerDe.serialize(spec)));
+    }
+
+    public Spec extractSpec(ResultSet rs) throws SQLException, IOException {
+      return MysqlBaseSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()));
+    }
+
+    protected String getTablelessExistsStatement() { return MysqlBaseSpecStore.EXISTS_STATEMENT; }
+    protected String getTablelessInsertStatement() { return MysqlBaseSpecStore.INSERT_STATEMENT; }
+    protected String getTablelessDeleteStatement() { return MysqlBaseSpecStore.DELETE_STATEMENT; }
+    protected String getTablelessGetStatementBase() { return MysqlBaseSpecStore.GET_STATEMENT_BASE; }
+    protected String getTablelessGetAllStatement() { return MysqlBaseSpecStore.GET_ALL_STATEMENT; }
+    protected String getTablelessGetAllURIsStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_STATEMENT; }
+    protected String getTablelessGetAllURIsWithTagStatement() { return MysqlBaseSpecStore.GET_ALL_URIS_WITH_TAG_STATEMENT; }
+    protected String getTablelessGetSizeStatement() { return MysqlBaseSpecStore.GET_SIZE_STATEMENT; }
+    protected String getTablelessCreateTableStatement() { return MysqlBaseSpecStore.CREATE_TABLE_STATEMENT; }
+  }
+
+
+  protected final DataSource dataSource;
+  protected final String tableName;
+  private final URI specStoreURI;
+  protected final SpecSerDe specSerDe;
+  protected final SqlStatements sqlStatements;
+
+  public MysqlBaseSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
+    super(config, specSerDe);
+    String configPrefix = getConfigPrefix();
+    if (config.hasPath(configPrefix)) {
+      config = config.getConfig(configPrefix).withFallback(config);
+    }
+
+    this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
+    this.tableName = config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
+    this.specStoreURI = URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
+    this.specSerDe = specSerDe;
+    this.sqlStatements = createSqlStatements();
+
+    withPreparedStatement(this.sqlStatements.createTableStatement, statement ->
+      statement.executeUpdate()
+    );
+  }
+
+  protected String getConfigPrefix() {
+    return MysqlBaseSpecStore.CONFIG_PREFIX;
+  }
+
+  protected SqlStatements createSqlStatements() {
+    return new SqlStatements();
+  }
+
+  @Override
+  public boolean existsImpl(URI specUri) throws IOException {
+    return withPreparedStatement(this.sqlStatements.existsStatement, statement -> {
+      statement.setString(1, specUri.toString());
+      try (ResultSet rs = statement.executeQuery()) {
+        rs.next();
+        return rs.getBoolean(1);
+      }
+    });
+  }
+
+  @Override
+  public void addSpecImpl(Spec spec) throws IOException {
+    this.addSpec(spec, DEFAULT_TAG_VALUE);
+  }
+
+  /**
+   * Temporarily only used for testing since tag it not exposed in endpoint of {@link org.apache.gobblin.runtime.api.FlowSpec}
+   */
+  public void addSpec(Spec spec, String tagValue) throws IOException {
+    withPreparedStatement(this.sqlStatements.insertStatement, statement -> {
+      this.sqlStatements.completeInsertPreparedStatement(statement, spec, tagValue);
+      statement.executeUpdate();
+      return null; // (type: `Void`)
+    }, true);
+  }
+
+  @Override
+  public boolean deleteSpec(Spec spec) throws IOException {
+    return deleteSpec(spec.getUri());
+  }
+
+  @Override
+  public boolean deleteSpecImpl(URI specUri) throws IOException {
+    return withPreparedStatement(this.sqlStatements.deleteStatement, statement -> {
+      statement.setString(1, specUri.toString());
+      int result = statement.executeUpdate();
+      return result != 0;
+    }, true);
+  }
+
+  @Override
+  public boolean deleteSpec(URI specUri, String version) throws IOException {
+    return deleteSpec(specUri);
+  }
+
+  @Override
+  // TODO: fix to obey the `SpecStore` contract of returning the *updated* `Spec`
+  public Spec updateSpecImpl(Spec spec) throws IOException {
+    addSpec(spec);
+    return spec;
+  }
+
+  @Override
+  public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
+    Iterator<Spec> resultSpecs = withPreparedStatement(this.sqlStatements.getAllStatement + " WHERE spec_uri = ?", statement -> {
+      statement.setString(1, specUri.toString());
+      return retrieveSpecs(statement).iterator();
+    });
+    if (resultSpecs.hasNext()) {
+      return resultSpecs.next();
+    } else {
+      throw new SpecNotFoundException(specUri);
+    }
+  }
+
+  @Override
+  public Spec getSpec(URI specUri, String version) throws IOException, SpecNotFoundException {
+    return getSpec(specUri); // `version` ignored, as mentioned in javadoc
+  }
+
+  @Override
+  public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException {
+    return Lists.newArrayList(getSpec(specUri));
+  }
+
+  @Override
+  public Collection<Spec> getSpecsImpl() throws IOException {
+    return withPreparedStatement(this.sqlStatements.getAllStatement, statement -> {
+      return retrieveSpecs(statement);
+    });
+  }
+
+  protected final Collection<Spec> retrieveSpecs(PreparedStatement statement) throws IOException {
+    List<Spec> specs = new ArrayList<>();
+    try (ResultSet rs = statement.executeQuery()) {
+      while (rs.next()) {
+        specs.add(this.sqlStatements.extractSpec(rs));
+      }
+    } catch (SQLException | SpecSerDeException e) {
+      log.error("Failed to deserialize spec", e);
+      throw new IOException(e);
+    }
+    return specs;
+  }
+
+  @Override
+  public Iterator<URI> getSpecURIsImpl() throws IOException {
+    return withPreparedStatement(this.sqlStatements.getAllURIsStatement, statement -> {
+      return retreiveURIs(statement).iterator();
+    });
+  }
+
+  @Override
+  public int getSizeImpl() throws IOException {
+    return withPreparedStatement(this.sqlStatements.getSizeStatement, statement -> {
+      try (ResultSet resultSet = statement.executeQuery()) {
+        resultSet.next();
+        return resultSet.getInt(1);
+      }
+    });
+  }
+
+  @Override
+  public Iterator<URI> getSpecURIsWithTagImpl(String tag) throws IOException {
+    return withPreparedStatement(this.sqlStatements.getAllURIsWithTagStatement, statement -> {
+      statement.setString(1, tag);
+      return retreiveURIs(statement).iterator();
+    });
+  }
+
+  private List<URI> retreiveURIs(PreparedStatement statement) throws SQLException {
+    List<URI> uris = new ArrayList<>();
+
+    try (ResultSet rs = statement.executeQuery()) {
+      while (rs.next()) {
+        URI specURI = URI.create(rs.getString(1));
+        uris.add(specURI);
+      }
+    }
+
+    return uris;
+  }
+
+  @Override
+  public Optional<URI> getSpecStoreURI() {
+    return Optional.of(this.specStoreURI);
+  }
+
+  /** Abstracts recurring pattern around resource management and exception re-mapping. */
+  protected <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f, boolean shouldCommit) throws IOException {
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(sql)) {
+      T result = f.apply(statement);
+      if (shouldCommit) {
+        connection.commit();
+      }
+      return result;
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /** Abstracts recurring pattern, while not presuming to DB `commit()`. */
+  protected final <T> T withPreparedStatement(String sql, CheckedFunction<PreparedStatement, T> f) throws IOException {
+    return withPreparedStatement(sql, f, false);
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index 032da32..00e203d 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -17,431 +17,130 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
+import com.google.common.base.Charsets;
+import com.google.common.io.ByteStreams;
+import com.google.gson.Gson;
+import com.typesafe.config.Config;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
-import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import com.google.common.base.Charsets;
-import com.google.common.base.Optional;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteStreams;
-import com.google.gson.Gson;
-import com.typesafe.config.Config;
-
-import javax.sql.DataSource;
 import lombok.extern.slf4j.Slf4j;
-
-import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
-import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
-import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
-import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
 import org.apache.gobblin.runtime.api.Spec;
-import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSearchObject;
 import org.apache.gobblin.runtime.api.SpecSerDe;
-import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.api.SpecStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
 import org.apache.gobblin.util.ConfigUtils;
 
-import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY;
-import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY;
+import static org.apache.gobblin.service.ServiceConfigKeys.*;
 
 
 /**
- * Implementation of {@link SpecStore} that stores specs as serialized java objects in MySQL. Note that versions are not
- * supported, so the version parameter will be ignored in methods that have it.
+ * Implementation of {@link SpecStore} that stores specs in MySQL both as a serialized BLOB and as a JSON string, and extends
+ * {@link MysqlBaseSpecStore} with enhanced FlowSpec search/retrieval capabilities.  The {@link SpecSerDe}'s serialized output
+ * is presumed suitable for a MySql `JSON` column.  As in the base, versions are unsupported and ignored.
  *
- * A tag column is added into implementation to serve certain filtering purposes in MySQL-based SpecStore.
- * For example, in DR mode of GaaS, we would only want certain {@link Spec}s to be eligible for orchestrated
- * by alternative GaaS instances. Another example is whitelisting/blacklisting {@link Spec}s temporarily
- * but not removing it from {@link SpecStore}.
+ * ETYMOLOGY: this class might better be named `MysqlFlowSpecStore` while the base be `MysqlSpecStore`, but the `MysqlSpecStore` name's
+ * association with this class's semantics predates the refactoring/generalization into `MysqlBaseSpecStore`.  Thus, to maintain
+ * b/w-compatibility and avoid surprising legacy users who already refer to it (e.g. by name, in configuration), that original name remains intact.
  */
 @Slf4j
-// todo : This should be renamed to MysqlFlowSpecStore, because this implementation only stores FlowSpec, not a TopologySpec
-public class MysqlSpecStore extends InstrumentedSpecStore {
+public class MysqlSpecStore extends MysqlBaseSpecStore {
   public static final String CONFIG_PREFIX = "mysqlSpecStore";
-  public static final String DEFAULT_TAG_VALUE = "";
-  private static final String NEW_COLUMN = "spec_json";
 
-  private static final String EXISTS_STATEMENT = "SELECT EXISTS(SELECT * FROM %s WHERE spec_uri = ?)";
-  protected static final String INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
-      + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, " + NEW_COLUMN + ") "
-      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
-  private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
-  private static final String GET_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s WHERE ";
-  private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
-  private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
-  private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
-  private static final String GET_SIZE_STATEMENT = "SELECT COUNT(*) FROM %s ";
+  // Historical Note: the `spec_json` column didn't always exist and was introduced for GOBBLIN-1150; the impl. thus allows that not every
+  // record may contain data there... though in practice, all should, given the passage of time (amidst the usual retention expiry).
+  protected static final String SPECIFIC_INSERT_STATEMENT = "INSERT INTO %s (spec_uri, flow_group, flow_name, template_uri, "
+      + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, owning_group, spec, spec_json) "
+      + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), spec_json = VALUES(spec_json)";
+  private static final String SPECIFIC_GET_STATEMENT_BASE = "SELECT spec_uri, spec, spec_json FROM %s WHERE ";
+  private static final String SPECIFIC_GET_ALL_STATEMENT = "SELECT spec_uri, spec, spec_json FROM %s";
+  private static final String SPECIFIC_CREATE_TABLE_STATEMENT = "CREATE TABLE IF NOT EXISTS %s (spec_uri VARCHAR("
+      + FlowSpec.Utils.maxFlowSpecUriLength()
+      + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
+      + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), " + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), "
+      + "source_identifier VARCHAR(128), " + "destination_identifier VARCHAR(128), schedule VARCHAR(128), "
+      + "tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
+      + "CURRENT_TIMESTAMP, isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), spec LONGBLOB, "
+      + "spec_json JSON, PRIMARY KEY (spec_uri))";
+
+  /** Bundle all changes following from schema differences against the base class. */
+  protected class SpecificSqlStatements extends SqlStatements {
+    @Override
+    public void completeInsertPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      URI specUri = flowSpec.getUri();
+      Config flowConfig = flowSpec.getConfig();
+      String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
+      String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
+      String templateURI = new Gson().toJson(flowSpec.getTemplateURIs());
+      String userToProxy = ConfigUtils.getString(flowSpec.getConfig(), "user.to.proxy", null);
+      String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
+      String destinationIdentifier = flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
+      String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
+      String owningGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_OWNING_GROUP_KEY, null);
+      boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
+
+      int i = 0;
+      statement.setString(++i, specUri.toString());
+      statement.setString(++i, flowGroup);
+      statement.setString(++i, flowName);
+      statement.setString(++i, templateURI);
+      statement.setString(++i, userToProxy);
+      statement.setString(++i, sourceIdentifier);
+      statement.setString(++i, destinationIdentifier);
+      statement.setString(++i, schedule);
+      statement.setString(++i, tagValue);
+      statement.setBoolean(++i, isRunImmediately);
+      statement.setString(++i, owningGroup);
+      statement.setBlob(++i, new ByteArrayInputStream(MysqlSpecStore.this.specSerDe.serialize(flowSpec)));
+      statement.setString(++i, new String(MysqlSpecStore.this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+    }
+
+    @Override
+    public Spec extractSpec(ResultSet rs) throws SQLException, IOException {
+      return rs.getString(3) == null
+          ? MysqlSpecStore.this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
+          : MysqlSpecStore.this.specSerDe.deserialize(rs.getString(3).getBytes(Charsets.UTF_8));
+    }
+
+    @Override
+    protected String getTablelessInsertStatement() { return MysqlSpecStore.SPECIFIC_INSERT_STATEMENT; }
+    @Override
+    protected String getTablelessGetStatementBase() { return MysqlSpecStore.SPECIFIC_GET_STATEMENT_BASE; }
+    @Override
+    protected String getTablelessGetAllStatement() { return MysqlSpecStore.SPECIFIC_GET_ALL_STATEMENT; }
+    @Override
+    protected String getTablelessCreateTableStatement() { return MysqlSpecStore.SPECIFIC_CREATE_TABLE_STATEMENT; }
+  }
 
-  protected final DataSource dataSource;
-  protected final String tableName;
-  private final URI specStoreURI;
-  protected final SpecSerDe specSerDe;
 
   public MysqlSpecStore(Config config, SpecSerDe specSerDe) throws IOException {
     super(config, specSerDe);
-    if (config.hasPath(CONFIG_PREFIX)) {
-      config = config.getConfig(CONFIG_PREFIX).withFallback(config);
-    }
-
-    this.dataSource = MysqlDataSourceFactory.get(config, SharedResourcesBrokerFactory.getImplicitBroker());
-    this.tableName = config.getString(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY);
-    this.specStoreURI = URI.create(config.getString(ConfigurationKeys.STATE_STORE_DB_URL_KEY));
-    this.specSerDe = specSerDe;
-
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(getCreateJobStateTableTemplate())) {
-      statement.executeUpdate();
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  protected String getCreateJobStateTableTemplate() {
-    return "CREATE TABLE IF NOT EXISTS " + this.tableName + " (spec_uri VARCHAR(" + FlowSpec.Utils.maxFlowSpecUriLength()
-        + ") NOT NULL, flow_group VARCHAR(" + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), flow_name VARCHAR("
-        + ServiceConfigKeys.MAX_FLOW_GROUP_LENGTH + "), " + "template_uri VARCHAR(128), user_to_proxy VARCHAR(128), "
-        + "source_identifier VARCHAR(128), " + "destination_identifier VARCHAR(128), schedule VARCHAR(128), "
-        + "tag VARCHAR(128) NOT NULL, modified_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE "
-        + "CURRENT_TIMESTAMP, isRunImmediately BOOLEAN, timezone VARCHAR(128), owning_group VARCHAR(128), spec LONGBLOB, "
-        + NEW_COLUMN + " JSON, PRIMARY KEY (spec_uri))";
-  }
-
-  @Override
-  public boolean existsImpl(URI specUri) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(EXISTS_STATEMENT, this.tableName))) {
-      statement.setString(1, specUri.toString());
-      try (ResultSet rs = statement.executeQuery()) {
-        rs.next();
-        return rs.getBoolean(1);
-      }
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public void addSpecImpl(Spec spec) throws IOException {
-    this.addSpec(spec, DEFAULT_TAG_VALUE);
-  }
-
-  /**
-   * Temporarily only used for testing since tag it not exposed in endpoint of {@link org.apache.gobblin.runtime.api.FlowSpec}
-   */
-  public void addSpec(Spec spec, String tagValue) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-      setAddPreparedStatement(statement, spec, tagValue);
-      statement.executeUpdate();
-      connection.commit();
-    } catch (SQLException | SpecSerDeException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public boolean deleteSpec(Spec spec) throws IOException {
-    return deleteSpec(spec.getUri());
-  }
-
-  @Override
-  public boolean deleteSpecImpl(URI specUri) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(DELETE_STATEMENT, this.tableName))) {
-      statement.setString(1, specUri.toString());
-      int result = statement.executeUpdate();
-      connection.commit();
-      return result != 0;
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
   }
 
   @Override
-  public boolean deleteSpec(URI specUri, String version) throws IOException {
-    return deleteSpec(specUri);
+  protected String getConfigPrefix() {
+    return MysqlSpecStore.CONFIG_PREFIX;
   }
 
   @Override
-  // TODO : this method is not doing what the contract is in the SpecStore interface
-  public Spec updateSpecImpl(Spec spec) throws IOException {
-    addSpec(spec);
-    return spec;
+  protected SqlStatements createSqlStatements() {
+    return new SpecificSqlStatements();
   }
 
+  /** Support search, unlike base class (presumably via a {@link org.apache.gobblin.runtime.api.FlowSpecSearchObject}). */
   @Override
-  public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
-    Iterator<Spec> specsIterator = getSpecsImpl(FlowSpecSearchObject.builder().flowSpecUri(specUri).build()).iterator();
-    if (specsIterator.hasNext()) {
-      return specsIterator.next();
-    } else {
-      throw new SpecNotFoundException(specUri);
-    }
-  }
-
   public Collection<Spec> getSpecsImpl(SpecSearchObject specSearchObject) throws IOException {
-    FlowSpecSearchObject flowSpecSearchObject = (FlowSpecSearchObject) specSearchObject;
-
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(createGetPreparedStatement(flowSpecSearchObject, this.tableName))) {
-      setGetPreparedStatement(statement, flowSpecSearchObject);
-      return getSpecsInternal(statement);
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public Spec getSpec(URI specUri, String version) throws IOException, SpecNotFoundException {
-    return getSpec(specUri);
-  }
-
-  @Override
-  public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException {
-    return Lists.newArrayList(getSpec(specUri));
-  }
-
-  @Override
-  public Collection<Spec> getSpecsImpl() throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
-      return getSpecsInternal(statement);
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private Collection<Spec> getSpecsInternal(PreparedStatement statement) throws IOException {
-    List<Spec> specs = new ArrayList<>();
-    try (ResultSet rs = statement.executeQuery()) {
-      while (rs.next()) {
-        specs.add(
-            rs.getString(3) == null
-                ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
-                : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
-        );
-      }
-    } catch (SQLException | SpecSerDeException e) {
-      log.error("Failed to deserialize spec", e);
-      throw new IOException(e);
-    }
-    return specs;
-  }
-
-  @Override
-  public Iterator<URI> getSpecURIsImpl() throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_URIS_STATEMENT, this.tableName))) {
-      return getURIIteratorByQuery(statement);
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public int getSizeImpl() throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(GET_SIZE_STATEMENT, this.tableName));
-        ResultSet resultSet = statement.executeQuery()) {
-      resultSet.next();
-      return resultSet.getInt(1);
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  @Override
-  public Iterator<URI> getSpecURIsWithTag(String tag) throws IOException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT_WITH_TAG, this.tableName))) {
-      statement.setString(1, tag);
-      return getURIIteratorByQuery(statement);
-    } catch (SQLException e) {
-      throw new IOException(e);
-    }
-  }
-
-  private Iterator<URI> getURIIteratorByQuery(PreparedStatement statement) throws SQLException {
-    List<URI> specs = new ArrayList<>();
-
-    try (ResultSet rs = statement.executeQuery()) {
-      while (rs.next()) {
-        URI specURI = URI.create(rs.getString(1));
-        specs.add(specURI);
-      }
-    }
-
-    return specs.iterator();
-  }
-
-  @Override
-  public Optional<URI> getSpecStoreURI() {
-    return Optional.of(this.specStoreURI);
-  }
-
-  /** This expects at least one parameter in {@link FlowSpecSearchObject} to be not null */
-  static String createGetPreparedStatement(FlowSpecSearchObject flowSpecSearchObject, String tableName)
-      throws IOException {
-    String baseStatement = String.format(GET_STATEMENT, tableName);
-    List<String> conditions = new ArrayList<>();
-
-    if (flowSpecSearchObject.getFlowSpecUri() != null) {
-      conditions.add("spec_uri = ?");
-    }
-
-    if (flowSpecSearchObject.getFlowGroup() != null) {
-      conditions.add("flow_group = ?");
-    }
-
-    if (flowSpecSearchObject.getFlowName() != null) {
-      conditions.add("flow_name = ?");
-    }
-
-    if (flowSpecSearchObject.getTemplateURI() != null) {
-      conditions.add("template_uri = ?");
-    }
-
-    if (flowSpecSearchObject.getUserToProxy() != null) {
-      conditions.add("user_to_proxy = ?");
-    }
-
-    if (flowSpecSearchObject.getSourceIdentifier() != null) {
-      conditions.add("source_identifier = ?");
-    }
-
-    if (flowSpecSearchObject.getDestinationIdentifier() != null) {
-      conditions.add("destination_identifier = ?");
-    }
-
-    if (flowSpecSearchObject.getSchedule() != null) {
-      conditions.add("schedule = ?");
-    }
-
-    if (flowSpecSearchObject.getModifiedTimestamp() != null) {
-      conditions.add("modified_time = ?");
-    }
-
-    if (flowSpecSearchObject.getIsRunImmediately() != null) {
-      conditions.add("isRunImmediately = ?");
-    }
-
-    if (flowSpecSearchObject.getOwningGroup() != null) {
-      conditions.add("owning_group = ?");
-    }
-
-    // If the propertyFilter is myKey=myValue, it looks for a config where key is `myKey` and value contains string `myValue`.
-    // If the propertyFilter string does not have `=`, it considers the string as a key and just looks for its existence.
-    // Multiple occurrences of `=` in  propertyFilter are not supported and ignored completely.
-    if (flowSpecSearchObject.getPropertyFilter() != null) {
-      String propertyFilter = flowSpecSearchObject.getPropertyFilter();
-      Splitter commaSplitter = Splitter.on(",").trimResults().omitEmptyStrings();
-      for (String property : commaSplitter.splitToList(propertyFilter)) {
-        if (property.contains("=")) {
-          String[] keyValue = property.split("=");
-          if (keyValue.length != 2) {
-            log.error("Incorrect flow config search query");
-            continue;
-          }
-          conditions.add("spec_json->'$.configAsProperties.\"" + keyValue[0] + "\"' like " + "'%" + keyValue[1] + "%'");
-        } else {
-          conditions.add("spec_json->'$.configAsProperties.\"" + property + "\"' is not null");
-        }
-      }
-    }
-
-    if (conditions.size() == 0) {
-      throw new IOException("At least one condition is required to query flow configs.");
-    }
-
-    return baseStatement + String.join(" AND ", conditions);
-  }
-
-  private static void setGetPreparedStatement(PreparedStatement statement, FlowSpecSearchObject flowSpecSearchObject)
-      throws SQLException {
-    int i = 0;
-
-    if (flowSpecSearchObject.getFlowSpecUri() != null) {
-      statement.setString(++i, flowSpecSearchObject.getFlowSpecUri().toString());
-    }
-
-    if (flowSpecSearchObject.getFlowGroup() != null) {
-      statement.setString(++i, flowSpecSearchObject.getFlowGroup());
-    }
-
-    if (flowSpecSearchObject.getFlowName() != null) {
-      statement.setString(++i, flowSpecSearchObject.getFlowName());
-    }
-
-    if (flowSpecSearchObject.getTemplateURI() != null) {
-      statement.setString(++i, flowSpecSearchObject.getTemplateURI());
-    }
-
-    if (flowSpecSearchObject.getUserToProxy() != null) {
-      statement.setString(++i, flowSpecSearchObject.getUserToProxy());
-    }
-
-    if (flowSpecSearchObject.getSourceIdentifier() != null) {
-      statement.setString(++i, flowSpecSearchObject.getSourceIdentifier());
-    }
-
-    if (flowSpecSearchObject.getDestinationIdentifier() != null) {
-      statement.setString(++i, flowSpecSearchObject.getDestinationIdentifier());
-    }
-
-    if (flowSpecSearchObject.getSchedule() != null) {
-      statement.setString(++i, flowSpecSearchObject.getModifiedTimestamp());
-    }
-
-    if (flowSpecSearchObject.getIsRunImmediately() != null) {
-      statement.setBoolean(++i, flowSpecSearchObject.getIsRunImmediately());
-    }
-
-    if (flowSpecSearchObject.getOwningGroup() != null) {
-      statement.setString(++i, flowSpecSearchObject.getOwningGroup());
-    }
-  }
-
-  protected void setAddPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
-    FlowSpec flowSpec = (FlowSpec) spec;
-    URI specUri = flowSpec.getUri();
-    Config flowConfig = flowSpec.getConfig();
-    String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
-    String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
-    String templateURI = new Gson().toJson(flowSpec.getTemplateURIs());
-    String userToProxy = ConfigUtils.getString(flowSpec.getConfig(), "user.to.proxy", null);
-    String sourceIdentifier = flowConfig.getString(FLOW_SOURCE_IDENTIFIER_KEY);
-    String destinationIdentifier = flowConfig.getString(FLOW_DESTINATION_IDENTIFIER_KEY);
-    String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
-    String owningGroup = ConfigUtils.getString(flowConfig, ConfigurationKeys.FLOW_OWNING_GROUP_KEY, null);
-    boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
-
-    int i = 0;
-    statement.setString(++i, specUri.toString());
-    statement.setString(++i, flowGroup);
-    statement.setString(++i, flowName);
-    statement.setString(++i, templateURI);
-    statement.setString(++i, userToProxy);
-    statement.setString(++i, sourceIdentifier);
-    statement.setString(++i, destinationIdentifier);
-    statement.setString(++i, schedule);
-    statement.setString(++i, tagValue);
-    statement.setBoolean(++i, isRunImmediately);
-    statement.setString(++i, owningGroup);
-    statement.setBlob(++i, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
-    statement.setString(++i, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+    return withPreparedStatement(specSearchObject.augmentBaseGetStatement(this.sqlStatements.getStatementBase), statement -> {
+      specSearchObject.completePreparedStatement(statement);
+      return retrieveSpecs(statement);
+    });
   }
 }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
new file mode 100644
index 0000000..1a68be2
--- /dev/null
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlBaseSpecStoreTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.spec_store;
+
+import com.google.common.collect.Iterators;
+import com.typesafe.config.Config;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.gobblin.config.ConfigBuilder;
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
+import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
+import org.apache.gobblin.runtime.api.TopologySpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+import org.apache.gobblin.runtime.api.Spec;
+import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
+import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class MysqlBaseSpecStoreTest {
+  private static final String USER = "testUser";
+  private static final String PASSWORD = "testPassword";
+  private static final String TABLE = "base_spec_store";
+
+  private MysqlBaseSpecStore specStore;
+  private final URI uri1 = new URI(new TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "1");
+  private final URI uri2 = new URI(new TopologySpec.Builder().getDefaultTopologyCatalogURI().toString() + "2");
+  private TopologySpec topoSpec1, topoSpec2;
+
+  public MysqlBaseSpecStoreTest()
+      throws URISyntaxException { // (based on `uri1` and other initializations just above)
+  }
+
+  @BeforeClass
+  public void setUp() throws Exception {
+    ITestMetastoreDatabase testDb = TestMetastoreDatabaseFactory.get();
+
+    // prefix keys to demonstrate disambiguation mechanism used to side-step intentially-sabatoged non-prefixed, 'fallback'
+    Config config = ConfigBuilder.create()
+        .addPrimitive(ConfigurationKeys.STATE_STORE_DB_URL_KEY, " SABATOGE! !" + testDb.getJdbcUrl())
+        .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_URL_KEY, testDb.getJdbcUrl())
+        .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_USER_KEY, USER)
+        .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY, PASSWORD)
+        .addPrimitive(MysqlBaseSpecStore.CONFIG_PREFIX + "." + ConfigurationKeys.STATE_STORE_DB_TABLE_KEY, TABLE)
+        .build();
+
+    this.specStore = new MysqlBaseSpecStore(config, new JavaSpecSerDe());
+
+    topoSpec1 = new TopologySpec.Builder(this.uri1)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive("key", "value")
+            .addPrimitive("key3", "value3")
+            .addPrimitive("config.with.dot", "value4").build())
+        .withDescription("Test1")
+        .withVersion("Test version")
+        .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execA")))
+        .build();
+    topoSpec2 = new TopologySpec.Builder(this.uri2)
+        .withConfig(ConfigBuilder.create().addPrimitive("converter", "value1,value2,value3")
+            .addPrimitive("key3", "value3").build())
+        .withDescription("Test2")
+        .withVersion("Test version 2")
+        .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execB")))
+        .build();
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void testSpecSearchUnsupported() throws Exception {
+    FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().build();
+    Collection<Spec> specs = this.specStore.getSpecs(flowSpecSearchObject);
+  }
+
+  @Test
+  public void testAddSpec() throws Exception {
+    this.specStore.addSpec(this.topoSpec1);
+    this.specStore.addSpec(this.topoSpec2);
+
+    Assert.assertEquals(this.specStore.getSize(), 2);
+    Assert.assertTrue(this.specStore.exists(this.uri1));
+    Assert.assertTrue(this.specStore.exists(this.uri2));
+    Assert.assertFalse(this.specStore.exists(URI.create("dummy")));
+  }
+
+  @Test (dependsOnMethods = "testAddSpec")
+  public void testGetSpec() throws Exception {
+    TopologySpec result = (TopologySpec) this.specStore.getSpec(this.uri1);
+    Assert.assertEquals(result, this.topoSpec1);
+
+    Collection<Spec> specs = this.specStore.getSpecs();
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.topoSpec1));
+    Assert.assertTrue(specs.contains(this.topoSpec1));
+
+    Iterator<URI> uris = this.specStore.getSpecURIs();
+    Assert.assertTrue(Iterators.contains(uris, this.uri1));
+    Assert.assertTrue(Iterators.contains(uris, this.uri2));
+  }
+
+  @Test (dependsOnMethods = "testGetSpec")
+  public void testGetSpecWithTag() throws Exception {
+    //Creating and inserting specs with tags
+    URI uri5 = URI.create("topospec5");
+    TopologySpec topoSpec5 = new TopologySpec.Builder(uri5)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive("key5", "value5").build())
+        .withDescription("Test5")
+        .withVersion("Test version 5")
+        .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execE")))
+        .build();
+
+    URI uri6 = URI.create("topospec6");
+    TopologySpec topoSpec6 = new TopologySpec.Builder(uri6)
+        .withConfig(ConfigBuilder.create()
+            .addPrimitive("key6", "value6").build())
+        .withDescription("Test6")
+        .withVersion("Test version 6")
+        .withSpecExecutor(MockedSpecExecutor.createDummySpecExecutor(new URI("execF")))
+        .build();
+
+    this.specStore.addSpec(topoSpec5, "dr");
+    this.specStore.addSpec(topoSpec6, "dr");
+
+    Assert.assertTrue(this.specStore.exists(uri5));
+    Assert.assertTrue(this.specStore.exists(uri6));
+    List<URI> result = new ArrayList<>();
+    this.specStore.getSpecURIsWithTag("dr").forEachRemaining(result::add);
+    Assert.assertEquals(result.size(), 2);
+  }
+
+  @Test (dependsOnMethods = "testGetSpecWithTag")
+  public void testDeleteSpec() throws Exception {
+    Assert.assertEquals(this.specStore.getSize(), 4);
+    this.specStore.deleteSpec(this.uri1);
+    Assert.assertEquals(this.specStore.getSize(), 3);
+    Assert.assertFalse(this.specStore.exists(this.uri1));
+  }
+}
\ No newline at end of file
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index b382e76..b7e9df5 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -66,7 +66,7 @@ public class MysqlSpecStoreTest {
   private FlowSpec flowSpec1, flowSpec2, flowSpec3, flowSpec4;
 
   public MysqlSpecStoreTest()
-      throws URISyntaxException {
+      throws URISyntaxException { // (based on `uri1` and other initializations just above)
   }
 
   @BeforeClass
@@ -131,7 +131,7 @@ public class MysqlSpecStoreTest {
   public void testSpecSearch() throws Exception {
     // empty FlowSpecSearchObject should throw an error
     FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().build();
-    MysqlSpecStore.createGetPreparedStatement(flowSpecSearchObject, "table");
+    flowSpecSearchObject.augmentBaseGetStatement("SELECT * FROM Dummy WHERE ");
   }
 
   @Test
@@ -274,8 +274,8 @@ public class MysqlSpecStoreTest {
     @Override
     public void addSpec(Spec spec, String tagValue) throws IOException {
       try (Connection connection = this.dataSource.getConnection();
-          PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-        setAddPreparedStatement(statement, spec, tagValue);
+          PreparedStatement statement = connection.prepareStatement(this.sqlStatements.insertStatement)) {
+        this.sqlStatements.completeInsertPreparedStatement(statement, spec, tagValue);
         statement.setString(4, null);
         statement.executeUpdate();
         connection.commit();