You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/22 18:41:47 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1196] search flow configs using flow properties and/or other parameters

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 60f33c5  [GOBBLIN-1196] search flow configs using flow properties and/or other parameters
60f33c5 is described below

commit 60f33c5720e4e8647a1bdc2d99c516f44bebcc50
Author: Arjun <ab...@linkedin.com>
AuthorDate: Mon Jun 22 11:41:40 2020 -0700

    [GOBBLIN-1196] search flow configs using flow properties and/or other parameters
    
    Closes #3042 from arjun4084346/configQuery
---
 ...che.gobblin.service.flowconfigsV2.restspec.json |  50 ++++-
 ...che.gobblin.service.flowconfigsV2.snapshot.json |  50 ++++-
 .../apache/gobblin/service/FlowConfigClient.java   |   3 +-
 .../apache/gobblin/service/FlowConfigV2Client.java |  42 +++-
 .../org/apache/gobblin/service/FlowConfigTest.java |   2 +-
 .../service/FlowConfigResourceLocalHandler.java    |  59 +++---
 .../service/FlowConfigsResourceHandler.java        |  12 ++
 .../gobblin/service/FlowConfigsV2Resource.java     |  44 ++++-
 .../FlowConfigResourceLocalHandlerTest.java        |  22 +--
 .../org/apache/gobblin/runtime/api/FlowSpec.java   |  66 ++++++-
 .../gobblin/runtime/api/FlowSpecSearchObject.java  |  54 ++++++
 .../gobblin/runtime/api/InstrumentedSpecStore.java |  17 ++
 .../apache/gobblin/runtime/api/SpecCatalog.java    |   9 +-
 .../gobblin/runtime/api/SpecSearchObject.java      |  24 +++
 .../org/apache/gobblin/runtime/api/SpecStore.java  |  10 +-
 .../gobblin/runtime/spec_catalog/FlowCatalog.java  |  24 ++-
 .../runtime/spec_catalog/TopologyCatalog.java      |   2 +-
 .../runtime/spec_serde/FlowSpecDeserializer.java   |   9 +-
 .../runtime/spec_serde/FlowSpecSerializer.java     |  11 +-
 .../gobblin/runtime/spec_store/MysqlSpecStore.java | 216 +++++++++++++++++----
 .../runtime/spec_store/MysqlSpecStoreTest.java     |  63 ++++--
 .../GobblinServiceFlowConfigResourceHandler.java   |  12 ++
 .../gobblin/service/GobblinServiceManagerTest.java |  34 +++-
 23 files changed, 683 insertions(+), 152 deletions(-)

diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index ce0714c..5a7ccb8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -10,7 +10,7 @@
       "type" : "org.apache.gobblin.service.FlowId",
       "params" : "org.apache.gobblin.service.FlowStatusId"
     },
-    "supports" : [ "create", "delete", "get", "partial_update", "update" ],
+    "supports" : [ "create", "delete", "get", "get_all", "partial_update", "update" ],
     "methods" : [ {
       "annotations" : {
         "returnEntity" : { }
@@ -29,6 +29,54 @@
     }, {
       "method" : "delete",
       "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
+    }, {
+      "method" : "get_all",
+      "doc" : "Retrieve all the flow configurations"
+    } ],
+    "finders" : [ {
+      "name" : "filterFlows",
+      "doc" : "Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.\n If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}",
+      "parameters" : [ {
+        "name" : "flowGroup",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "flowName",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "templateUri",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "userToProxy",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "sourceIdentifier",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "destinationIdentifier",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "schedule",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "isRunImmediately",
+        "type" : "boolean",
+        "optional" : true
+      }, {
+        "name" : "owningGroup",
+        "type" : "string",
+        "optional" : true
+      }, {
+        "name" : "propertyFilter",
+        "type" : "string",
+        "optional" : true
+      } ]
     } ],
     "entity" : {
       "path" : "/flowconfigsV2/{id}"
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 50064ca..fa8e0cd 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -108,7 +108,7 @@
         "type" : "org.apache.gobblin.service.FlowId",
         "params" : "org.apache.gobblin.service.FlowStatusId"
       },
-      "supports" : [ "create", "delete", "get", "partial_update", "update" ],
+      "supports" : [ "create", "delete", "get", "get_all", "partial_update", "update" ],
       "methods" : [ {
         "annotations" : {
           "returnEntity" : { }
@@ -127,6 +127,54 @@
       }, {
         "method" : "delete",
         "doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
+      }, {
+        "method" : "get_all",
+        "doc" : "Retrieve all the flow configurations"
+      } ],
+      "finders" : [ {
+        "name" : "filterFlows",
+        "doc" : "Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.\n If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}",
+        "parameters" : [ {
+          "name" : "flowGroup",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "flowName",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "templateUri",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "userToProxy",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "sourceIdentifier",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "destinationIdentifier",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "schedule",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "isRunImmediately",
+          "type" : "boolean",
+          "optional" : true
+        }, {
+          "name" : "owningGroup",
+          "type" : "string",
+          "optional" : true
+        }, {
+          "name" : "propertyFilter",
+          "type" : "string",
+          "optional" : true
+        } ]
       } ],
       "entity" : {
         "path" : "/flowconfigsV2/{id}"
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
index 7fed2d1..7e78e29 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
@@ -135,8 +135,7 @@ public class FlowConfigClient implements Closeable {
    */
   public FlowConfig getFlowConfig(FlowId flowId)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
-        flowId.getFlowName());
+    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
 
     GetRequest<FlowConfig> getRequest = _flowconfigsRequestBuilders.get()
         .id(new ComplexResourceKey<>(flowId, new EmptyRecord())).build();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 9835245..56130f6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -37,12 +38,15 @@ import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
 import com.linkedin.r2.transport.http.client.HttpClientFactory;
 import com.linkedin.restli.client.CreateIdEntityRequest;
 import com.linkedin.restli.client.DeleteRequest;
+import com.linkedin.restli.client.FindRequest;
+import com.linkedin.restli.client.GetAllRequest;
 import com.linkedin.restli.client.GetRequest;
 import com.linkedin.restli.client.PartialUpdateRequest;
 import com.linkedin.restli.client.Response;
 import com.linkedin.restli.client.ResponseFuture;
 import com.linkedin.restli.client.RestClient;
 import com.linkedin.restli.client.UpdateRequest;
+import com.linkedin.restli.common.CollectionResponse;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.IdEntityResponse;
@@ -180,18 +184,48 @@ public class FlowConfigV2Client implements Closeable {
    */
   public FlowConfig getFlowConfig(FlowId flowId)
       throws RemoteInvocationException {
-    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
-        flowId.getFlowName());
+    LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
 
     GetRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.get()
         .id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
 
-    Response<FlowConfig> response =
-        _restClient.get().sendRequest(getRequest).getResponse();
+    Response<FlowConfig> response = _restClient.get().sendRequest(getRequest).getResponse();
     return response.getEntity();
   }
 
   /**
+   * Get all {@link FlowConfig}s
+   * @return all {@link FlowConfig}s
+   * @throws RemoteInvocationException
+   */
+  public Collection<FlowConfig> getAllFlowConfigs() throws RemoteInvocationException {
+    LOG.debug("getAllFlowConfigs called");
+
+    GetAllRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.getAll().build();
+    Response<CollectionResponse<FlowConfig>> response = _restClient.get().sendRequest(getRequest).getResponse();
+    return response.getEntity().getElements();
+  }
+
+  /**
+   * Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.
+   * If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}
+   */
+  public Collection<FlowConfig> getFlowConfigs(String flowGroup, String flowName, String templateUri, String userToProxy,
+      String sourceIdentifier, String destinationIdentifier, String schedule, Boolean isRunImmediately, String owningGroup,
+      String propertyFilter) throws RemoteInvocationException {
+    LOG.debug("getAllFlowConfigs called");
+
+    FindRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.findByFilterFlows()
+        .flowGroupParam(flowGroup).flowNameParam(flowName).templateUriParam(templateUri).userToProxyParam(userToProxy)
+        .sourceIdentifierParam(sourceIdentifier).destinationIdentifierParam(destinationIdentifier).scheduleParam(schedule)
+        .isRunImmediatelyParam(isRunImmediately).owningGroupParam(owningGroup).propertyFilterParam(propertyFilter).build();
+
+    Response<CollectionResponse<FlowConfig>> response = _restClient.get().sendRequest(getRequest).getResponse();
+
+    return response.getEntity().getElements();
+  }
+
+  /**
    * Delete a flow configuration
    * @param flowId identifier of flow configuration to delete
    * @throws RemoteInvocationException
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 697ed8d..38bc2d5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -48,6 +47,7 @@ import org.apache.gobblin.config.ConfigBuilder;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.restli.EmbeddedRestliServer;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 
 
 @Test(groups = { "gobblin.service" })
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index eaa62a5..2404eff 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -19,13 +19,13 @@ package org.apache.gobblin.service;
 
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collection;
 import java.util.Properties;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
 
 import com.codahale.metrics.MetricRegistry;
-import com.google.common.collect.Maps;
-import com.linkedin.data.template.StringMap;
 import com.linkedin.restli.common.ComplexResourceKey;
 import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
@@ -46,6 +46,7 @@ import org.apache.gobblin.metrics.ContextAwareMeter;
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.ServiceMetricNames;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
 import org.apache.gobblin.util.ConfigUtils;
@@ -75,46 +76,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
   }
 
   /**
-   * Get flow config
+   * Get flow config given a {@link FlowId}
    */
   public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException {
     log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
 
     try {
       URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
-      FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
-      FlowConfig flowConfig = new FlowConfig();
-      Properties flowProps = spec.getConfigAsProperties();
-      Schedule schedule = null;
-
-      if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
-        schedule = new Schedule();
-        schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
-      }
-      if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
-        flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH));
-      } else if (spec.getTemplateURIs().isPresent()) {
-        flowConfig.setTemplateUris(StringUtils.join(spec.getTemplateURIs().get(), ","));
-      } else {
-        flowConfig.setTemplateUris("NA");
-      }
-      if (schedule != null) {
-        if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) {
-          schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)));
-        }
-
-        flowConfig.setSchedule(schedule);
-      }
-
-      // remove keys that were injected as part of flowSpec creation
-      flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
-      flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
-
-      StringMap flowPropsAsStringMap = new StringMap();
-      flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
-
-      return flowConfig.setId(new FlowId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName()))
-          .setProperties(flowPropsAsStringMap);
+      FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+      return FlowSpec.Utils.toFlowConfig(spec);
     } catch (URISyntaxException e) {
       throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowId.getFlowName(), e);
     } catch (SpecNotFoundException e) {
@@ -123,6 +93,23 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
   }
 
   /**
+   * Get flow config given a {@link FlowSpecSearchObject}
+   * @return all the {@link FlowConfig}s that satisfy the {@link FlowSpecSearchObject}
+   */
+  public Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException {
+    log.info("[GAAS-REST] Get called with flowSpecSearchObject {}", flowSpecSearchObject);
+    return flowCatalog.getSpecs(flowSpecSearchObject).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+  }
+
+  /**
+   * Get all flow configs
+   */
+  public Collection<FlowConfig> getAllFlowConfigs() {
+    log.info("[GAAS-REST] GetAll called");
+    return flowCatalog.getAllSpecs().stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+  }
+
+  /**
    * Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true
    */
   public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index ccab8ff..cc224eb 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -17,18 +17,30 @@
 
 package org.apache.gobblin.service;
 
+import java.util.Collection;
 import java.util.Properties;
 
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateResponse;
 import com.linkedin.restli.server.UpdateResponse;
 
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+
 
 public interface FlowConfigsResourceHandler {
   /**
    * Get {@link FlowConfig}
    */
   FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException;
+  /**
+   * Get {@link FlowConfig}
+   * @return
+   */
+  Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException;
+  /**
+   * Get all {@link FlowConfig}
+   */
+  Collection<FlowConfig> getAllFlowConfigs();
 
   /**
    * Add {@link FlowConfig}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 77d64fc..3bbfca0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  */
 package org.apache.gobblin.service;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -26,12 +27,16 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.ImmutableSet;
 import com.linkedin.restli.common.ComplexResourceKey;
-import com.linkedin.restli.common.EmptyRecord;
 import com.linkedin.restli.common.HttpStatus;
 import com.linkedin.restli.common.PatchRequest;
 import com.linkedin.restli.server.CreateKVResponse;
 import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.PagingContext;
 import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Context;
+import com.linkedin.restli.server.annotations.Finder;
+import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.QueryParam;
 import com.linkedin.restli.server.annotations.RestLiCollection;
 import com.linkedin.restli.server.annotations.ReturnEntity;
 import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
@@ -39,6 +44,8 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
 import javax.inject.Inject;
 import javax.inject.Named;
 
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+
 
 /**
  * Resource for handling flow configuration requests
@@ -80,10 +87,37 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
    */
   @Override
   public FlowConfig get(ComplexResourceKey<FlowId, FlowStatusId> key) {
-    String flowGroup = key.getKey().getFlowGroup();
-    String flowName = key.getKey().getFlowName();
-    FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
-    return this.getFlowConfigResourceHandler().getFlowConfig(flowId);
+    return this.getFlowConfigResourceHandler().getFlowConfig(key.getKey());
+  }
+
+  /**
+   * Retrieve all the flow configurations
+   */
+  @Override
+  public List<FlowConfig> getAll(@Context PagingContext pagingContext) {
+    return (List) this.getFlowConfigResourceHandler().getAllFlowConfigs();
+  }
+
+  /**
+   * Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.
+   * If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}
+   */
+  @Finder("filterFlows")
+  public List<FlowConfig> getFilteredFlows(@Context PagingContext context,
+      @Optional @QueryParam("flowGroup") String flowGroup,
+      @Optional @QueryParam("flowName") String flowName,
+      @Optional @QueryParam("templateUri") String templateUri,
+      @Optional @QueryParam("userToProxy") String userToProxy,
+      @Optional @QueryParam("sourceIdentifier") String sourceIdentifier,
+      @Optional @QueryParam("destinationIdentifier") String destinationIdentifier,
+      @Optional @QueryParam("schedule") String schedule,
+      @Optional @QueryParam("isRunImmediately") Boolean isRunImmediately,
+      @Optional @QueryParam("owningGroup") String owningGroup,
+      @Optional @QueryParam("propertyFilter") String propertyFilter) {
+    FlowSpecSearchObject flowSpecSearchObject = new FlowSpecSearchObject(null, flowGroup, flowName,
+        templateUri, userToProxy, sourceIdentifier, destinationIdentifier, schedule, null,
+        isRunImmediately, owningGroup, propertyFilter);
+    return (List) this.getFlowConfigResourceHandler().getFlowConfig(flowSpecSearchObject);
   }
 
   /**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java
index 4f4784f..6a28686 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java
@@ -27,15 +27,11 @@ import org.testng.annotations.Test;
 import com.google.common.collect.Maps;
 import com.linkedin.data.template.StringMap;
 
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.runtime.api.FlowSpec;
 
 
 public class FlowConfigResourceLocalHandlerTest {
-  private static final String FLOW_GROUP_KEY = "flow.group";
-  private static final String FLOW_NAME_KEY = "flow.name";
-  private static final String SCHEDULE_KEY = "job.schedule";
-  private static final String RUN_IMMEDIATELY_KEY = "flow.runImmediately";
-
   private static final String TEST_GROUP_NAME = "testGroup1";
   private static final String TEST_FLOW_NAME = "testFlow1";
   private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
@@ -52,10 +48,10 @@ public class FlowConfigResourceLocalHandlerTest {
         .setProperties(new StringMap(flowProperties));
 
     FlowSpec flowSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
-    Assert.assertEquals(flowSpec.getConfig().getString(FLOW_GROUP_KEY), TEST_GROUP_NAME);
-    Assert.assertEquals(flowSpec.getConfig().getString(FLOW_NAME_KEY), TEST_FLOW_NAME);
-    Assert.assertEquals(flowSpec.getConfig().getString(SCHEDULE_KEY), TEST_SCHEDULE);
-    Assert.assertEquals(flowSpec.getConfig().getBoolean(RUN_IMMEDIATELY_KEY), true);
+    Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME);
+    Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
+    Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), TEST_SCHEDULE);
+    Assert.assertEquals(flowSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY), true);
     Assert.assertEquals(flowSpec.getConfig().getString("param1"), "a:b:c*.d");
     Assert.assertEquals(flowSpec.getTemplateURIs().get().size(), 1);
     Assert.assertTrue(flowSpec.getTemplateURIs().get().contains(new URI(TEST_TEMPLATE_URI)));
@@ -70,10 +66,10 @@ public class FlowConfigResourceLocalHandlerTest {
         .setProperties(new StringMap(flowProperties));
     flowSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
 
-    Assert.assertEquals(flowSpec.getConfig().getString(FLOW_GROUP_KEY), TEST_GROUP_NAME);
-    Assert.assertEquals(flowSpec.getConfig().getString(FLOW_NAME_KEY), TEST_FLOW_NAME);
-    Assert.assertEquals(flowSpec.getConfig().getString(SCHEDULE_KEY), TEST_SCHEDULE);
-    Assert.assertEquals(flowSpec.getConfig().getBoolean(RUN_IMMEDIATELY_KEY), true);
+    Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME);
+    Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
+    Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), TEST_SCHEDULE);
+    Assert.assertEquals(flowSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY), true);
     Assert.assertEquals(flowSpec.getConfig().getString("param1"),"value1");
     Assert.assertEquals(flowSpec.getConfig().getString("param2"),"value1-123");
     Assert.assertEquals(flowSpec.getConfig().getString("param3"), "a:b:c*.d");
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index bec0445..a7df2e0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -17,28 +17,36 @@
 
 package org.apache.gobblin.runtime.api;
 
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.linkedin.data.template.StringMap;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
 import org.apache.gobblin.util.ConfigUtils;
 
 
@@ -411,5 +419,49 @@ public class FlowSpec implements Configurable, Spec {
       }
       return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
     }
+
+    /**
+     * Create a {@link FlowConfig} from a {@link Spec}.
+     * The {@link Spec} must have {@link ConfigurationKeys#FLOW_GROUP_KEY} and {@link ConfigurationKeys#FLOW_NAME_KEY} set.
+     * @param spec spec
+     * @return {@link FlowConfig}
+     */
+    public static FlowConfig toFlowConfig(Spec spec) {
+      FlowSpec flowSpec = (FlowSpec) spec;
+      FlowConfig flowConfig = new FlowConfig();
+      Properties flowProps = flowSpec.getConfigAsProperties();
+      Schedule schedule = null;
+
+      if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+        schedule = new Schedule();
+        schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+      }
+      if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
+        flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH));
+      } else if (flowSpec.getTemplateURIs().isPresent()) {
+        flowConfig.setTemplateUris(StringUtils.join(flowSpec.getTemplateURIs().get(), ","));
+      } else {
+        flowConfig.setTemplateUris("NA");
+      }
+      if (schedule != null) {
+        if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) {
+          schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)));
+        }
+
+        flowConfig.setSchedule(schedule);
+      }
+
+      // remove keys that were injected as part of flowSpec creation
+      flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
+      flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
+
+      StringMap flowPropsAsStringMap = new StringMap();
+      flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
+
+      return flowConfig.setId(new FlowId()
+          .setFlowGroup(flowProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY))
+          .setFlowName(flowProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)))
+          .setProperties(flowPropsAsStringMap);
+    }
   }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
new file mode 100644
index 0000000..ef6737c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.service.FlowId;
+
+
+/**
+ * This is a class to package all the parameters that should be used to search {@link FlowSpec} in a {@link SpecStore}
+ */
+@Getter
+@Builder
+@ToString
+@AllArgsConstructor
+public class FlowSpecSearchObject implements SpecSearchObject {
+  private final URI flowSpecUri;
+  private final String flowGroup;
+  private final String flowName;
+  private final String templateURI;
+  private final String userToProxy;
+  private final String sourceIdentifier;
+  private final String destinationIdentifier;
+  private final String schedule;
+  private final String modifiedTimestamp;
+  private final Boolean isRunImmediately;
+  private final String owningGroup;
+  private final String propertyFilter;
+
+  public static FlowSpecSearchObject fromFlowId(FlowId flowId) {
+    return FlowSpecSearchObject.builder().flowGroup(flowId.getFlowGroup()).flowName(flowId.getFlowName()).build();
+  }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 442fdc9..e2c206e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -117,6 +117,18 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   }
 
   @Override
+  public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) throws IOException {
+    if (!instrumentationEnabled) {
+      return getSpecsImpl(specSearchObject);
+    } else {
+      long startTimeMillis = System.currentTimeMillis();
+      Collection<Spec> specs = getSpecsImpl(specSearchObject);
+      Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+      return specs;
+    }
+  }
+
+  @Override
   public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
     if (!instrumentationEnabled) {
       return updateSpecImpl(spec);
@@ -159,4 +171,9 @@ public abstract class InstrumentedSpecStore implements SpecStore {
   public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
   public abstract Collection<Spec> getSpecsImpl() throws IOException;
   public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+
+  /** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
+  public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws IOException {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index cc55cc4..ebb88e7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -64,7 +64,14 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
    * Get a {@link Spec} by uri.
    * @throws SpecNotFoundException if no such Spec exists
    **/
-  Spec getSpec(URI uri) throws SpecNotFoundException;
+  Spec getSpecs(URI uri) throws SpecNotFoundException;
+
+  /**
+   * Get a {@link Spec} by {@link SpecSearchObject}.
+   **/
+  default Collection<Spec> getSpecs(SpecSearchObject specSearchObject) {
+    throw new UnsupportedOperationException();
+  }
 
   @Slf4j
   class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
new file mode 100644
index 0000000..febf4c9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * This is an interface to package all the parameters that should be used to search {@link Spec} in a {@link SpecStore}
+ */
+public interface SpecSearchObject {
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index fa8a713..25f8f68 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -17,12 +17,13 @@
 
 package org.apache.gobblin.runtime.api;
 
-import com.google.common.base.Optional;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.Iterator;
 
+import com.google.common.base.Optional;
+
 
 public interface SpecStore {
 
@@ -86,6 +87,13 @@ public interface SpecStore {
   Spec getSpec(URI specUri) throws IOException, SpecNotFoundException;
 
   /***
+   * Retrieve {@link Spec}s by {@link SpecSearchObject} from the {@link SpecStore}.
+   * @param specSearchObject {@link SpecSearchObject} for the {@link Spec} to be retrieved.
+   * @throws IOException Exception in retrieving the {@link Spec}.
+   */
+  Collection<Spec> getSpecs(SpecSearchObject specSearchObject) throws IOException;
+
+  /***
    * Retrieve specified version of the {@link Spec} by URI from the {@link SpecStore}.
    * @param specUri URI for the {@link Spec} to be retrieved.
    * @param version Version for the {@link Spec} to be retrieved.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index b64ebd7..44d54ea 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.runtime.api.SpecSearchObject;
 import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
 import org.apache.gobblin.runtime.spec_store.FSSpecStore;
 import org.apache.gobblin.service.ServiceConfigKeys;
@@ -269,7 +270,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
   }
 
   @Override
-  public Spec getSpec(URI uri) throws SpecNotFoundException {
+  public Spec getSpecs(URI uri) throws SpecNotFoundException {
     try {
       return specStore.getSpec(uri);
     } catch (IOException e) {
@@ -277,15 +278,32 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
     }
   }
 
+  @Override
+  public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) {
+    try {
+      return specStore.getSpecs(specSearchObject);
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + specSearchObject, e);
+    }
+  }
+
+  public Collection<Spec> getAllSpecs() {
+    try {
+      return specStore.getSpecs();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot retrieve all specs from Spec stores", e);
+    }
+  }
+
   /**
-   * A wrapper of getSpec that handles {@link SpecNotFoundException} properly.
+   * A wrapper of getSpecs that handles {@link SpecNotFoundException} properly.
    * This is the most common way to fetch {@link Spec}. For customized way to deal with exception, one will
    * need to implement specific catch-block logic.
    */
   public Spec getSpecWrapper(URI uri) {
     Spec spec = null;
     try {
-      spec = getSpec(uri);
+      spec = getSpecs(uri);
     } catch (SpecNotFoundException snfe) {
       log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog"
           + ", suspecting current modification on SpecStore", uri), snfe);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 4814a71..5d86768 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -229,7 +229,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
   }
 
   @Override
-  public Spec getSpec(URI uri) throws SpecNotFoundException {
+  public Spec getSpecs(URI uri) throws SpecNotFoundException {
     try {
       return specStore.getSpec(uri);
     } catch (IOException e) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
index 65f867e..00cf513 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
@@ -17,8 +17,6 @@
 
 package org.apache.gobblin.runtime.spec_serde;
 
-import java.io.IOException;
-import java.io.StringReader;
 import java.lang.reflect.Type;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -50,12 +48,7 @@ public class FlowSpecDeserializer implements JsonDeserializer<FlowSpec> {
     String description = jsonObject.get(FlowSpecSerializer.FLOW_SPEC_DESCRIPTION_KEY).getAsString();
     Config config = ConfigFactory.parseString(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_KEY).getAsString());
 
-    Properties properties = new Properties();
-    try {
-      properties.load(new StringReader(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
-    } catch (IOException e) {
-      throw new JsonParseException(e);
-    }
+    Properties properties = context.deserialize(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY), Properties.class);
 
     Set<URI> templateURIs = new HashSet<>();
     try {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
index 85a5c34..904634c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
@@ -17,15 +17,12 @@
 
 package org.apache.gobblin.runtime.spec_serde;
 
-import java.io.IOException;
-import java.io.StringWriter;
 import java.lang.reflect.Type;
 import java.net.URI;
 
 import com.google.gson.JsonArray;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
 import com.google.gson.JsonSerializationContext;
 import com.google.gson.JsonSerializer;
 import com.typesafe.config.ConfigRenderOptions;
@@ -52,13 +49,7 @@ public class FlowSpecSerializer implements JsonSerializer<FlowSpec> {
     flowSpecJson.add(FLOW_SPEC_DESCRIPTION_KEY, context.serialize(src.getDescription()));
     flowSpecJson.add(FLOW_SPEC_CONFIG_KEY, context.serialize(src.getConfig().root().render(ConfigRenderOptions.concise())));
 
-    StringWriter writer = new StringWriter();
-    try {
-      src.getConfigAsProperties().store(writer, "");
-    } catch (IOException e) {
-      throw new JsonParseException(e);
-    }
-    flowSpecJson.add(FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY, context.serialize(writer.getBuffer().toString()));
+    flowSpecJson.add(FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY, context.serialize(src.getConfigAsProperties()));
 
     JsonArray templateURIs = new JsonArray();
     if (src.getTemplateURIs().isPresent()) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index a5a0d2b..ff6bc85 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -31,6 +31,7 @@ import java.util.List;
 
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.io.ByteStreams;
 import com.google.gson.Gson;
@@ -43,9 +44,11 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.MysqlDataSourceFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSearchObject;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
 import org.apache.gobblin.runtime.api.SpecStore;
@@ -65,6 +68,7 @@ import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIE
  * but not removing it from {@link SpecStore}.
  */
 @Slf4j
+// todo : This should be renamed to MysqlFlowSpecStore, because this implementation only stores FlowSpec, not a TopologySpec
 public class MysqlSpecStore extends InstrumentedSpecStore {
   public static final String CONFIG_PREFIX = "mysqlSpecStore";
   public static final String DEFAULT_TAG_VALUE = "";
@@ -82,7 +86,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
       + "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, spec, " + NEW_COLUMN + ") "
       + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
   private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
-  private static final String GET_STATEMENT = "SELECT spec, " + NEW_COLUMN + " FROM %s WHERE spec_uri = ?";
+  private static final String GET_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s WHERE ";
   private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
   private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
   private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
@@ -136,7 +140,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
   public void addSpec(Spec spec, String tagValue) throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-      setPreparedStatement(statement, spec, tagValue);
+      setAddPreparedStatement(statement, spec, tagValue);
       statement.executeUpdate();
       connection.commit();
     } catch (SQLException | SpecSerDeException e) {
@@ -176,19 +180,22 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
 
   @Override
   public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
-    try (Connection connection = this.dataSource.getConnection();
-        PreparedStatement statement = connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
-      statement.setString(1, specUri.toString());
+    Iterator<Spec> specsIterator = getSpecsImpl(FlowSpecSearchObject.builder().flowSpecUri(specUri).build()).iterator();
+    if (specsIterator.hasNext()) {
+      return specsIterator.next();
+    } else {
+      throw new SpecNotFoundException(specUri);
+    }
+  }
 
-      try (ResultSet rs = statement.executeQuery()) {
-        if (!rs.next()) {
-          throw new SpecNotFoundException(specUri);
-        }
-        return rs.getString(2) == null
-            ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(1).getBinaryStream()))
-            : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8));
-      }
-    } catch (SQLException | SpecSerDeException e) {
+  public Collection<Spec> getSpecsImpl(SpecSearchObject specSearchObject) throws IOException {
+    FlowSpecSearchObject flowSpecSearchObject = (FlowSpecSearchObject) specSearchObject;
+
+    try (Connection connection = this.dataSource.getConnection();
+        PreparedStatement statement = connection.prepareStatement(createGetPreparedStatement(flowSpecSearchObject, this.tableName))) {
+      setGetPreparedStatement(statement, flowSpecSearchObject);
+      return getSpecsInternal(statement);
+    } catch (SQLException e) {
       throw new IOException(e);
     }
   }
@@ -207,25 +214,27 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
   public Collection<Spec> getSpecsImpl() throws IOException {
     try (Connection connection = this.dataSource.getConnection();
         PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
-      List<Spec> specs = new ArrayList<>();
+      return getSpecsInternal(statement);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    }
+  }
 
-      try (ResultSet rs = statement.executeQuery()) {
-        while (rs.next()) {
-          try {
-            specs.add(
-                rs.getString(3) == null
-                    ? this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
-                    : this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(3).getBinaryStream()))
-            );
-          } catch (SQLException | SpecSerDeException e) {
-            log.error("Failed to deserialize spec", e);
-          }
-        }
+  private Collection<Spec> getSpecsInternal(PreparedStatement statement) throws IOException {
+    List<Spec> specs = new ArrayList<>();
+    try (ResultSet rs = statement.executeQuery()) {
+      while (rs.next()) {
+        specs.add(
+            rs.getString(3) == null
+                ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
+                : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
+        );
       }
-      return specs;
-    } catch (SQLException e) {
+    } catch (SQLException | SpecSerDeException e) {
+      log.error("Failed to deserialize spec", e);
       throw new IOException(e);
     }
+    return specs;
   }
 
   @Override
@@ -267,7 +276,129 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
     return Optional.of(this.specStoreURI);
   }
 
-  protected void setPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+  /** This expects at least one parameter in {@link FlowSpecSearchObject} to be not null */
+  static String createGetPreparedStatement(FlowSpecSearchObject flowSpecSearchObject, String tableName)
+      throws IOException {
+    String baseStatement = String.format(GET_STATEMENT, tableName);
+    List<String> conditions = new ArrayList<>();
+
+    if (flowSpecSearchObject.getFlowSpecUri() != null) {
+      conditions.add("spec_uri = ?");
+    }
+
+    if (flowSpecSearchObject.getFlowGroup() != null) {
+      conditions.add("flow_group = ?");
+    }
+
+    if (flowSpecSearchObject.getFlowName() != null) {
+      conditions.add("flow_name = ?");
+    }
+
+    if (flowSpecSearchObject.getTemplateURI() != null) {
+      conditions.add("template_uri = ?");
+    }
+
+    if (flowSpecSearchObject.getUserToProxy() != null) {
+      conditions.add("user_to_proxy = ?");
+    }
+
+    if (flowSpecSearchObject.getSourceIdentifier() != null) {
+      conditions.add("source_identifier = ?");
+    }
+
+    if (flowSpecSearchObject.getDestinationIdentifier() != null) {
+      conditions.add("destination_identifier = ?");
+    }
+
+    if (flowSpecSearchObject.getSchedule() != null) {
+      conditions.add("schedule = ?");
+    }
+
+    if (flowSpecSearchObject.getModifiedTimestamp() != null) {
+      conditions.add("modified_time = ?");
+    }
+
+    if (flowSpecSearchObject.getIsRunImmediately() != null) {
+      conditions.add("isRunImmediately = ?");
+    }
+
+    if (flowSpecSearchObject.getOwningGroup() != null) {
+      conditions.add("owning_group = ?");
+    }
+
+    // If the propertyFilter is myKey=myValue, it looks for a config where key is `myKey` and value contains string `myValue`.
+    // If the propertyFilter string does not have `=`, it considers the string as a key and just looks for its existence.
+    // Multiple occurrences of `=` in  propertyFilter are not supported and ignored completely.
+    if (flowSpecSearchObject.getPropertyFilter() != null) {
+      String propertyFilter = flowSpecSearchObject.getPropertyFilter();
+      Splitter commaSplitter = Splitter.on(",").trimResults().omitEmptyStrings();
+      for (String property : commaSplitter.splitToList(propertyFilter)) {
+        if (property.contains("=")) {
+          String[] keyValue = property.split("=");
+          if (keyValue.length != 2) {
+            log.error("Incorrect flow config search query");
+            continue;
+          }
+          conditions.add("spec_json->'$.configAsProperties." + keyValue[0] + "' like " + "'%" + keyValue[1] + "%'");
+        } else {
+          conditions.add("spec_json->'$.configAsProperties." + property + "' is not null");
+        }
+      }
+    }
+
+    if (conditions.size() == 0) {
+      throw new IOException("At least one condition is required to query flow configs.");
+    }
+
+    return baseStatement + String.join(" AND ", conditions);
+  }
+
+  private static void setGetPreparedStatement(PreparedStatement statement, FlowSpecSearchObject flowSpecSearchObject)
+      throws SQLException {
+    int i = 0;
+
+    if (flowSpecSearchObject.getFlowSpecUri() != null) {
+      statement.setString(++i, flowSpecSearchObject.getFlowSpecUri().toString());
+    }
+
+    if (flowSpecSearchObject.getFlowGroup() != null) {
+      statement.setString(++i, flowSpecSearchObject.getFlowGroup());
+    }
+
+    if (flowSpecSearchObject.getFlowName() != null) {
+      statement.setString(++i, flowSpecSearchObject.getFlowName());
+    }
+
+    if (flowSpecSearchObject.getTemplateURI() != null) {
+      statement.setString(++i, flowSpecSearchObject.getTemplateURI());
+    }
+
+    if (flowSpecSearchObject.getUserToProxy() != null) {
+      statement.setString(++i, flowSpecSearchObject.getUserToProxy());
+    }
+
+    if (flowSpecSearchObject.getSourceIdentifier() != null) {
+      statement.setString(++i, flowSpecSearchObject.getSourceIdentifier());
+    }
+
+    if (flowSpecSearchObject.getDestinationIdentifier() != null) {
+      statement.setString(++i, flowSpecSearchObject.getDestinationIdentifier());
+    }
+
+    if (flowSpecSearchObject.getSchedule() != null) {
+      statement.setString(++i, flowSpecSearchObject.getModifiedTimestamp());
+    }
+
+    if (flowSpecSearchObject.getIsRunImmediately() != null) {
+      statement.setBoolean(++i, flowSpecSearchObject.getIsRunImmediately());
+    }
+
+    if (flowSpecSearchObject.getOwningGroup() != null) {
+      statement.setString(++i, flowSpecSearchObject.getOwningGroup());
+    }
+  }
+
+  protected void setAddPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
     FlowSpec flowSpec = (FlowSpec) spec;
     URI specUri = flowSpec.getUri();
     Config flowConfig = flowSpec.getConfig();
@@ -280,17 +411,18 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
     String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
     boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
 
-    statement.setString(1, specUri.toString());
-    statement.setString(2, flowGroup);
-    statement.setString(3, flowName);
-    statement.setString(4, templateURI);
-    statement.setString(5, userToProxy);
-    statement.setString(6, sourceIdentifier);
-    statement.setString(7, destinationIdentifier);
-    statement.setString(8, schedule);
-    statement.setString(9, tagValue);
-    statement.setBoolean(10, isRunImmediately);
-    statement.setBlob(11, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
-    statement.setString(12, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+    int i = 0;
+    statement.setString(++i, specUri.toString());
+    statement.setString(++i, flowGroup);
+    statement.setString(++i, flowName);
+    statement.setString(++i, templateURI);
+    statement.setString(++i, userToProxy);
+    statement.setString(++i, sourceIdentifier);
+    statement.setString(++i, destinationIdentifier);
+    statement.setString(++i, schedule);
+    statement.setString(++i, tagValue);
+    statement.setBoolean(++i, isRunImmediately);
+    statement.setBlob(++i, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
+    statement.setString(++i, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
   }
 }
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 6762f3c..ec6a14e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.gobblin.runtime.spec_store;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
@@ -27,16 +26,13 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
-
 import java.util.List;
-import java.util.Properties;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Charsets;
 import com.google.common.collect.Iterators;
 import com.typesafe.config.Config;
 
@@ -45,6 +41,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
 import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
 import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 import org.apache.gobblin.runtime.api.Spec;
 import org.apache.gobblin.runtime.api.SpecSerDe;
 import org.apache.gobblin.runtime.api.SpecSerDeException;
@@ -62,9 +59,9 @@ public class MysqlSpecStoreTest {
 
   private MysqlSpecStore specStore;
   private MysqlSpecStore oldSpecStore;
-  private URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
-  private URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
-  private URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+  private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+  private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+  private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
   private FlowSpec flowSpec1, flowSpec2, flowSpec3;
 
   public MysqlSpecStoreTest()
@@ -85,13 +82,10 @@ public class MysqlSpecStoreTest {
     this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
     this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
 
-    Properties properties = new Properties();
-    properties.setProperty(FLOW_SOURCE_IDENTIFIER_KEY, "source");
-    properties.setProperty(FLOW_DESTINATION_IDENTIFIER_KEY, "destination");
-
     flowSpec1 = FlowSpec.builder(this.uri1)
         .withConfig(ConfigBuilder.create()
             .addPrimitive("key", "value")
+            .addPrimitive("key3", "value3")
             .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
             .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
@@ -100,7 +94,8 @@ public class MysqlSpecStoreTest {
         .withVersion("Test version")
         .build();
     flowSpec2 = FlowSpec.builder(this.uri2)
-        .withConfig(ConfigBuilder.create().addPrimitive("key2", "value2")
+        .withConfig(ConfigBuilder.create().addPrimitive("converter", "value1,value2,value3")
+            .addPrimitive("key3", "value3")
             .addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
             .addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
             .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
@@ -119,6 +114,13 @@ public class MysqlSpecStoreTest {
         .build();
   }
 
+  @Test(expectedExceptions = IOException.class)
+  public void testSpecSearch() throws Exception {
+    // empty FlowSpecSearchObject should throw an error
+    FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().build();
+    MysqlSpecStore.createGetPreparedStatement(flowSpecSearchObject, "table");
+  }
+
   @Test
   public void testAddSpec() throws Exception {
     this.specStore.addSpec(this.flowSpec1);
@@ -135,15 +137,46 @@ public class MysqlSpecStoreTest {
     Assert.assertEquals(result, this.flowSpec1);
 
     Collection<Spec> specs = this.specStore.getSpecs();
+    Assert.assertEquals(specs.size(), 2);
     Assert.assertTrue(specs.contains(this.flowSpec1));
     Assert.assertTrue(specs.contains(this.flowSpec2));
 
     Iterator<URI> uris = this.specStore.getSpecURIs();
     Assert.assertTrue(Iterators.contains(uris, this.uri1));
     Assert.assertTrue(Iterators.contains(uris, this.uri2));
+
+    FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().flowGroup("fg1").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+
+    flowSpecSearchObject = FlowSpecSearchObject.builder().flowName("fn2").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    flowSpecSearchObject = FlowSpecSearchObject.builder().flowName("fg1").flowGroup("fn2").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 0);
+
+    flowSpecSearchObject = FlowSpecSearchObject.builder().propertyFilter("key=value").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+
+    flowSpecSearchObject = FlowSpecSearchObject.builder().propertyFilter("converter=value2").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 1);
+    Assert.assertTrue(specs.contains(this.flowSpec2));
+
+    flowSpecSearchObject = FlowSpecSearchObject.builder().propertyFilter("key3").build();
+    specs = this.specStore.getSpecs(flowSpecSearchObject);
+    Assert.assertEquals(specs.size(), 2);
+    Assert.assertTrue(specs.contains(this.flowSpec1));
+    Assert.assertTrue(specs.contains(this.flowSpec2));
   }
 
-  @Test
+  @Test  (dependsOnMethods = "testGetSpec")
   public void testGetSpecWithTag() throws Exception {
 
     //Creating and inserting flowspecs with tags
@@ -186,7 +219,7 @@ public class MysqlSpecStoreTest {
     this.specStore.addSpec(this.flowSpec3);
   }
 
-  @Test (dependsOnMethods = "testGetSpec")
+  @Test (dependsOnMethods = "testGetSpecWithTag")
   public void testDeleteSpec() throws Exception {
     this.specStore.deleteSpec(this.uri1);
     Assert.assertFalse(this.specStore.exists(this.uri1));
@@ -214,7 +247,7 @@ public class MysqlSpecStoreTest {
     public void addSpec(Spec spec, String tagValue) throws IOException {
       try (Connection connection = this.dataSource.getConnection();
           PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
-        setPreparedStatement(statement, spec, tagValue);
+        setAddPreparedStatement(statement, spec, tagValue);
         statement.setString(4, null);
         statement.executeUpdate();
         connection.commit();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 2be2572..567ab70 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.service.modules.restli;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Properties;
 import java.util.UUID;
 
@@ -38,6 +39,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
 import org.apache.gobblin.service.FlowConfig;
 import org.apache.gobblin.service.FlowConfigLoggedException;
 import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -79,6 +81,16 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
     return this.localHandler.getFlowConfig(flowId);
   }
 
+  @Override
+  public Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException {
+    return this.localHandler.getFlowConfig(flowSpecSearchObject);
+  }
+
+  @Override
+  public Collection<FlowConfig> getAllFlowConfigs() {
+    return this.localHandler.getAllFlowConfigs();
+  }
+
   /**
    * Adding {@link FlowConfig} should check if current node is active (master).
    * If current node is active, call {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index d1e793f..f1eafbb 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -83,7 +83,9 @@ public class GobblinServiceManagerTest {
 
   private static final String TEST_GROUP_NAME = "testGroup";
   private static final String TEST_FLOW_NAME = "testFlow";
+  private static final String TEST_FLOW_NAME2 = "testFlow2";
   private static final FlowId TEST_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+  private static final FlowId TEST_FLOW_ID2 = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME2);
   private static final FlowId UNCOMPILABLE_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME)
       .setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
 
@@ -337,6 +339,36 @@ public class GobblinServiceManagerTest {
     Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
   }
 
+  @Test (dependsOnMethods = "testCreateAgain")
+  public void testGetAll() throws Exception {
+    FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
+        .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
+        .setProperties(new StringMap(flowProperties));
+    this.flowConfigClient.createFlowConfig(flowConfig2);
+    Collection<FlowConfig> flowConfigs = this.flowConfigClient.getAllFlowConfigs();
+
+    Assert.assertEquals(flowConfigs.size(), 2);
+
+    this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID2);
+  }
+
+  @Test (dependsOnMethods = "testCreateAgain", enabled = false)
+  public void testGetFilteredFlows() throws Exception {
+    // Not implemented for FsSpecStore
+
+    Collection<FlowConfig> flowConfigs = this.flowConfigClient.getFlowConfigs(TEST_GROUP_NAME, null, null, null, null, null,
+null, null, null, null);
+    Assert.assertEquals(flowConfigs.size(), 2);
+
+    flowConfigs = this.flowConfigClient.getFlowConfigs(TEST_GROUP_NAME, TEST_FLOW_NAME2, null, null, null, null,
+        null, null, null, null);
+    Assert.assertEquals(flowConfigs.size(), 1);
+
+    flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null, null, null,
+        TEST_SCHEDULE, null, null, null);
+    Assert.assertEquals(flowConfigs.size(), 2);
+  }
+
   @Test (dependsOnMethods = "testGet")
   public void testUpdate() throws Exception {
     FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
@@ -347,7 +379,7 @@ public class GobblinServiceManagerTest {
     flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
     flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
 
-    FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+    FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
         .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
         .setProperties(new StringMap(flowProperties));