You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/22 18:41:47 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1196] search
flow configs using flow properties and/or other parameters
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 60f33c5 [GOBBLIN-1196] search flow configs using flow properties and/or other parameters
60f33c5 is described below
commit 60f33c5720e4e8647a1bdc2d99c516f44bebcc50
Author: Arjun <ab...@linkedin.com>
AuthorDate: Mon Jun 22 11:41:40 2020 -0700
[GOBBLIN-1196] search flow configs using flow properties and/or other parameters
Closes #3042 from arjun4084346/configQuery
---
...che.gobblin.service.flowconfigsV2.restspec.json | 50 ++++-
...che.gobblin.service.flowconfigsV2.snapshot.json | 50 ++++-
.../apache/gobblin/service/FlowConfigClient.java | 3 +-
.../apache/gobblin/service/FlowConfigV2Client.java | 42 +++-
.../org/apache/gobblin/service/FlowConfigTest.java | 2 +-
.../service/FlowConfigResourceLocalHandler.java | 59 +++---
.../service/FlowConfigsResourceHandler.java | 12 ++
.../gobblin/service/FlowConfigsV2Resource.java | 44 ++++-
.../FlowConfigResourceLocalHandlerTest.java | 22 +--
.../org/apache/gobblin/runtime/api/FlowSpec.java | 66 ++++++-
.../gobblin/runtime/api/FlowSpecSearchObject.java | 54 ++++++
.../gobblin/runtime/api/InstrumentedSpecStore.java | 17 ++
.../apache/gobblin/runtime/api/SpecCatalog.java | 9 +-
.../gobblin/runtime/api/SpecSearchObject.java | 24 +++
.../org/apache/gobblin/runtime/api/SpecStore.java | 10 +-
.../gobblin/runtime/spec_catalog/FlowCatalog.java | 24 ++-
.../runtime/spec_catalog/TopologyCatalog.java | 2 +-
.../runtime/spec_serde/FlowSpecDeserializer.java | 9 +-
.../runtime/spec_serde/FlowSpecSerializer.java | 11 +-
.../gobblin/runtime/spec_store/MysqlSpecStore.java | 216 +++++++++++++++++----
.../runtime/spec_store/MysqlSpecStoreTest.java | 63 ++++--
.../GobblinServiceFlowConfigResourceHandler.java | 12 ++
.../gobblin/service/GobblinServiceManagerTest.java | 34 +++-
23 files changed, 683 insertions(+), 152 deletions(-)
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
index ce0714c..5a7ccb8 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/idl/org.apache.gobblin.service.flowconfigsV2.restspec.json
@@ -10,7 +10,7 @@
"type" : "org.apache.gobblin.service.FlowId",
"params" : "org.apache.gobblin.service.FlowStatusId"
},
- "supports" : [ "create", "delete", "get", "partial_update", "update" ],
+ "supports" : [ "create", "delete", "get", "get_all", "partial_update", "update" ],
"methods" : [ {
"annotations" : {
"returnEntity" : { }
@@ -29,6 +29,54 @@
}, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
+ }, {
+ "method" : "get_all",
+ "doc" : "Retrieve all the flow configurations"
+ } ],
+ "finders" : [ {
+ "name" : "filterFlows",
+ "doc" : "Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.\n If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}",
+ "parameters" : [ {
+ "name" : "flowGroup",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "flowName",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "templateUri",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "userToProxy",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "sourceIdentifier",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "destinationIdentifier",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "schedule",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "isRunImmediately",
+ "type" : "boolean",
+ "optional" : true
+ }, {
+ "name" : "owningGroup",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "propertyFilter",
+ "type" : "string",
+ "optional" : true
+ } ]
} ],
"entity" : {
"path" : "/flowconfigsV2/{id}"
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
index 50064ca..fa8e0cd 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-api/src/main/snapshot/org.apache.gobblin.service.flowconfigsV2.snapshot.json
@@ -108,7 +108,7 @@
"type" : "org.apache.gobblin.service.FlowId",
"params" : "org.apache.gobblin.service.FlowStatusId"
},
- "supports" : [ "create", "delete", "get", "partial_update", "update" ],
+ "supports" : [ "create", "delete", "get", "get_all", "partial_update", "update" ],
"methods" : [ {
"annotations" : {
"returnEntity" : { }
@@ -127,6 +127,54 @@
}, {
"method" : "delete",
"doc" : "Delete a configured flow. Running flows are not affected. The schedule will be removed for scheduled flows."
+ }, {
+ "method" : "get_all",
+ "doc" : "Retrieve all the flow configurations"
+ } ],
+ "finders" : [ {
+ "name" : "filterFlows",
+ "doc" : "Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.\n If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}",
+ "parameters" : [ {
+ "name" : "flowGroup",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "flowName",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "templateUri",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "userToProxy",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "sourceIdentifier",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "destinationIdentifier",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "schedule",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "isRunImmediately",
+ "type" : "boolean",
+ "optional" : true
+ }, {
+ "name" : "owningGroup",
+ "type" : "string",
+ "optional" : true
+ }, {
+ "name" : "propertyFilter",
+ "type" : "string",
+ "optional" : true
+ } ]
} ],
"entity" : {
"path" : "/flowconfigsV2/{id}"
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
index 7fed2d1..7e78e29 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigClient.java
@@ -135,8 +135,7 @@ public class FlowConfigClient implements Closeable {
*/
public FlowConfig getFlowConfig(FlowId flowId)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
- flowId.getFlowName());
+ LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
GetRequest<FlowConfig> getRequest = _flowconfigsRequestBuilders.get()
.id(new ComplexResourceKey<>(flowId, new EmptyRecord())).build();
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
index 9835245..56130f6 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/main/java/org/apache/gobblin/service/FlowConfigV2Client.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.service;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -37,12 +38,15 @@ import com.linkedin.r2.transport.common.bridge.client.TransportClientAdapter;
import com.linkedin.r2.transport.http.client.HttpClientFactory;
import com.linkedin.restli.client.CreateIdEntityRequest;
import com.linkedin.restli.client.DeleteRequest;
+import com.linkedin.restli.client.FindRequest;
+import com.linkedin.restli.client.GetAllRequest;
import com.linkedin.restli.client.GetRequest;
import com.linkedin.restli.client.PartialUpdateRequest;
import com.linkedin.restli.client.Response;
import com.linkedin.restli.client.ResponseFuture;
import com.linkedin.restli.client.RestClient;
import com.linkedin.restli.client.UpdateRequest;
+import com.linkedin.restli.common.CollectionResponse;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.IdEntityResponse;
@@ -180,18 +184,48 @@ public class FlowConfigV2Client implements Closeable {
*/
public FlowConfig getFlowConfig(FlowId flowId)
throws RemoteInvocationException {
- LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " +
- flowId.getFlowName());
+ LOG.debug("getFlowConfig with groupName " + flowId.getFlowGroup() + " flowName " + flowId.getFlowName());
GetRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.get()
.id(new ComplexResourceKey<>(flowId, new FlowStatusId())).build();
- Response<FlowConfig> response =
- _restClient.get().sendRequest(getRequest).getResponse();
+ Response<FlowConfig> response = _restClient.get().sendRequest(getRequest).getResponse();
return response.getEntity();
}
/**
+ * Get all {@link FlowConfig}s
+ * @return all {@link FlowConfig}s
+ * @throws RemoteInvocationException
+ */
+ public Collection<FlowConfig> getAllFlowConfigs() throws RemoteInvocationException {
+ LOG.debug("getAllFlowConfigs called");
+
+ GetAllRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.getAll().build();
+ Response<CollectionResponse<FlowConfig>> response = _restClient.get().sendRequest(getRequest).getResponse();
+ return response.getEntity().getElements();
+ }
+
+ /**
+ * Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.
+ * If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}
+ */
+ public Collection<FlowConfig> getFlowConfigs(String flowGroup, String flowName, String templateUri, String userToProxy,
+ String sourceIdentifier, String destinationIdentifier, String schedule, Boolean isRunImmediately, String owningGroup,
+ String propertyFilter) throws RemoteInvocationException {
+ LOG.debug("getAllFlowConfigs called");
+
+ FindRequest<FlowConfig> getRequest = _flowconfigsV2RequestBuilders.findByFilterFlows()
+ .flowGroupParam(flowGroup).flowNameParam(flowName).templateUriParam(templateUri).userToProxyParam(userToProxy)
+ .sourceIdentifierParam(sourceIdentifier).destinationIdentifierParam(destinationIdentifier).scheduleParam(schedule)
+ .isRunImmediatelyParam(isRunImmediately).owningGroupParam(owningGroup).propertyFilterParam(propertyFilter).build();
+
+ Response<CollectionResponse<FlowConfig>> response = _restClient.get().sendRequest(getRequest).getResponse();
+
+ return response.getEntity().getElements();
+ }
+
+ /**
* Delete a flow configuration
* @param flowId identifier of flow configuration to delete
* @throws RemoteInvocationException
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
index 697ed8d..38bc2d5 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
import java.util.Map;
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -48,6 +47,7 @@ import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.restli.EmbeddedRestliServer;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
+import org.apache.gobblin.runtime.spec_store.FSSpecStore;
@Test(groups = { "gobblin.service" })
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
index eaa62a5..2404eff 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigResourceLocalHandler.java
@@ -19,13 +19,13 @@ package org.apache.gobblin.service;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collection;
import java.util.Properties;
+import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
import com.codahale.metrics.MetricRegistry;
-import com.google.common.collect.Maps;
-import com.linkedin.data.template.StringMap;
import com.linkedin.restli.common.ComplexResourceKey;
import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
@@ -46,6 +46,7 @@ import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.util.ConfigUtils;
@@ -75,46 +76,15 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
}
/**
- * Get flow config
+ * Get flow config given a {@link FlowId}
*/
public FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException {
log.info("[GAAS-REST] Get called with flowGroup {} flowName {}", flowId.getFlowGroup(), flowId.getFlowName());
try {
URI flowUri = FlowSpec.Utils.createFlowSpecUri(flowId);
- FlowSpec spec = (FlowSpec) flowCatalog.getSpec(flowUri);
- FlowConfig flowConfig = new FlowConfig();
- Properties flowProps = spec.getConfigAsProperties();
- Schedule schedule = null;
-
- if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
- schedule = new Schedule();
- schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
- }
- if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
- flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH));
- } else if (spec.getTemplateURIs().isPresent()) {
- flowConfig.setTemplateUris(StringUtils.join(spec.getTemplateURIs().get(), ","));
- } else {
- flowConfig.setTemplateUris("NA");
- }
- if (schedule != null) {
- if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) {
- schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)));
- }
-
- flowConfig.setSchedule(schedule);
- }
-
- // remove keys that were injected as part of flowSpec creation
- flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
- flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
-
- StringMap flowPropsAsStringMap = new StringMap();
- flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
-
- return flowConfig.setId(new FlowId().setFlowGroup(flowId.getFlowGroup()).setFlowName(flowId.getFlowName()))
- .setProperties(flowPropsAsStringMap);
+ FlowSpec spec = (FlowSpec) flowCatalog.getSpecs(flowUri);
+ return FlowSpec.Utils.toFlowConfig(spec);
} catch (URISyntaxException e) {
throw new FlowConfigLoggedException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowId.getFlowName(), e);
} catch (SpecNotFoundException e) {
@@ -123,6 +93,23 @@ public class FlowConfigResourceLocalHandler implements FlowConfigsResourceHandle
}
/**
+ * Get flow config given a {@link FlowSpecSearchObject}
+ * @return all the {@link FlowConfig}s that satisfy the {@link FlowSpecSearchObject}
+ */
+ public Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException {
+ log.info("[GAAS-REST] Get called with flowSpecSearchObject {}", flowSpecSearchObject);
+ return flowCatalog.getSpecs(flowSpecSearchObject).stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+ }
+
+ /**
+ * Get all flow configs
+ */
+ public Collection<FlowConfig> getAllFlowConfigs() {
+ log.info("[GAAS-REST] GetAll called");
+ return flowCatalog.getAllSpecs().stream().map(FlowSpec.Utils::toFlowConfig).collect(Collectors.toList());
+ }
+
+ /**
* Add flowConfig locally and trigger all listeners iff @param triggerListener is set to true
*/
public CreateResponse createFlowConfig(FlowConfig flowConfig, boolean triggerListener) throws FlowConfigLoggedException {
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
index ccab8ff..cc224eb 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResourceHandler.java
@@ -17,18 +17,30 @@
package org.apache.gobblin.service;
+import java.util.Collection;
import java.util.Properties;
import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateResponse;
import com.linkedin.restli.server.UpdateResponse;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+
public interface FlowConfigsResourceHandler {
/**
* Get {@link FlowConfig}
*/
FlowConfig getFlowConfig(FlowId flowId) throws FlowConfigLoggedException;
+ /**
+ * Get {@link FlowConfig}
+ * @return
+ */
+ Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException;
+ /**
+ * Get all {@link FlowConfig}
+ */
+ Collection<FlowConfig> getAllFlowConfigs();
/**
* Add {@link FlowConfig}
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
index 77d64fc..3bbfca0 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsV2Resource.java
@@ -15,6 +15,7 @@
* limitations under the License.
*/
package org.apache.gobblin.service;
+
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -26,12 +27,16 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableSet;
import com.linkedin.restli.common.ComplexResourceKey;
-import com.linkedin.restli.common.EmptyRecord;
import com.linkedin.restli.common.HttpStatus;
import com.linkedin.restli.common.PatchRequest;
import com.linkedin.restli.server.CreateKVResponse;
import com.linkedin.restli.server.CreateResponse;
+import com.linkedin.restli.server.PagingContext;
import com.linkedin.restli.server.UpdateResponse;
+import com.linkedin.restli.server.annotations.Context;
+import com.linkedin.restli.server.annotations.Finder;
+import com.linkedin.restli.server.annotations.Optional;
+import com.linkedin.restli.server.annotations.QueryParam;
import com.linkedin.restli.server.annotations.RestLiCollection;
import com.linkedin.restli.server.annotations.ReturnEntity;
import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
@@ -39,6 +44,8 @@ import com.linkedin.restli.server.resources.ComplexKeyResourceTemplate;
import javax.inject.Inject;
import javax.inject.Named;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
+
/**
* Resource for handling flow configuration requests
@@ -80,10 +87,37 @@ public class FlowConfigsV2Resource extends ComplexKeyResourceTemplate<FlowId, Fl
*/
@Override
public FlowConfig get(ComplexResourceKey<FlowId, FlowStatusId> key) {
- String flowGroup = key.getKey().getFlowGroup();
- String flowName = key.getKey().getFlowName();
- FlowId flowId = new FlowId().setFlowGroup(flowGroup).setFlowName(flowName);
- return this.getFlowConfigResourceHandler().getFlowConfig(flowId);
+ return this.getFlowConfigResourceHandler().getFlowConfig(key.getKey());
+ }
+
+ /**
+ * Retrieve all the flow configurations
+ */
+ @Override
+ public List<FlowConfig> getAll(@Context PagingContext pagingContext) {
+ return (List) this.getFlowConfigResourceHandler().getAllFlowConfigs();
+ }
+
+ /**
+ * Get all {@link FlowConfig}s that matches the provided parameters. All the parameters are optional.
+ * If a parameter is null, it is ignored. {@see FlowConfigV2Resource#getFilteredFlows}
+ */
+ @Finder("filterFlows")
+ public List<FlowConfig> getFilteredFlows(@Context PagingContext context,
+ @Optional @QueryParam("flowGroup") String flowGroup,
+ @Optional @QueryParam("flowName") String flowName,
+ @Optional @QueryParam("templateUri") String templateUri,
+ @Optional @QueryParam("userToProxy") String userToProxy,
+ @Optional @QueryParam("sourceIdentifier") String sourceIdentifier,
+ @Optional @QueryParam("destinationIdentifier") String destinationIdentifier,
+ @Optional @QueryParam("schedule") String schedule,
+ @Optional @QueryParam("isRunImmediately") Boolean isRunImmediately,
+ @Optional @QueryParam("owningGroup") String owningGroup,
+ @Optional @QueryParam("propertyFilter") String propertyFilter) {
+ FlowSpecSearchObject flowSpecSearchObject = new FlowSpecSearchObject(null, flowGroup, flowName,
+ templateUri, userToProxy, sourceIdentifier, destinationIdentifier, schedule, null,
+ isRunImmediately, owningGroup, propertyFilter);
+ return (List) this.getFlowConfigResourceHandler().getFlowConfig(flowSpecSearchObject);
}
/**
diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java
index 4f4784f..6a28686 100644
--- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java
+++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/test/java/org/apache/gobblin/service/FlowConfigResourceLocalHandlerTest.java
@@ -27,15 +27,11 @@ import org.testng.annotations.Test;
import com.google.common.collect.Maps;
import com.linkedin.data.template.StringMap;
+import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.runtime.api.FlowSpec;
public class FlowConfigResourceLocalHandlerTest {
- private static final String FLOW_GROUP_KEY = "flow.group";
- private static final String FLOW_NAME_KEY = "flow.name";
- private static final String SCHEDULE_KEY = "job.schedule";
- private static final String RUN_IMMEDIATELY_KEY = "flow.runImmediately";
-
private static final String TEST_GROUP_NAME = "testGroup1";
private static final String TEST_FLOW_NAME = "testFlow1";
private static final String TEST_SCHEDULE = "0 1/0 * ? * *";
@@ -52,10 +48,10 @@ public class FlowConfigResourceLocalHandlerTest {
.setProperties(new StringMap(flowProperties));
FlowSpec flowSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
- Assert.assertEquals(flowSpec.getConfig().getString(FLOW_GROUP_KEY), TEST_GROUP_NAME);
- Assert.assertEquals(flowSpec.getConfig().getString(FLOW_NAME_KEY), TEST_FLOW_NAME);
- Assert.assertEquals(flowSpec.getConfig().getString(SCHEDULE_KEY), TEST_SCHEDULE);
- Assert.assertEquals(flowSpec.getConfig().getBoolean(RUN_IMMEDIATELY_KEY), true);
+ Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME);
+ Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
+ Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), TEST_SCHEDULE);
+ Assert.assertEquals(flowSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY), true);
Assert.assertEquals(flowSpec.getConfig().getString("param1"), "a:b:c*.d");
Assert.assertEquals(flowSpec.getTemplateURIs().get().size(), 1);
Assert.assertTrue(flowSpec.getTemplateURIs().get().contains(new URI(TEST_TEMPLATE_URI)));
@@ -70,10 +66,10 @@ public class FlowConfigResourceLocalHandlerTest {
.setProperties(new StringMap(flowProperties));
flowSpec = FlowConfigResourceLocalHandler.createFlowSpecForConfig(flowConfig);
- Assert.assertEquals(flowSpec.getConfig().getString(FLOW_GROUP_KEY), TEST_GROUP_NAME);
- Assert.assertEquals(flowSpec.getConfig().getString(FLOW_NAME_KEY), TEST_FLOW_NAME);
- Assert.assertEquals(flowSpec.getConfig().getString(SCHEDULE_KEY), TEST_SCHEDULE);
- Assert.assertEquals(flowSpec.getConfig().getBoolean(RUN_IMMEDIATELY_KEY), true);
+ Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_GROUP_KEY), TEST_GROUP_NAME);
+ Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.FLOW_NAME_KEY), TEST_FLOW_NAME);
+ Assert.assertEquals(flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), TEST_SCHEDULE);
+ Assert.assertEquals(flowSpec.getConfig().getBoolean(ConfigurationKeys.FLOW_RUN_IMMEDIATELY), true);
Assert.assertEquals(flowSpec.getConfig().getString("param1"),"value1");
Assert.assertEquals(flowSpec.getConfig().getString("param2"),"value1-123");
Assert.assertEquals(flowSpec.getConfig().getString("param3"), "a:b:c*.d");
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
index bec0445..a7df2e0 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java
@@ -17,28 +17,36 @@
package org.apache.gobblin.runtime.api;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.linkedin.data.template.StringMap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowId;
+import org.apache.gobblin.service.Schedule;
import org.apache.gobblin.util.ConfigUtils;
@@ -411,5 +419,49 @@ public class FlowSpec implements Configurable, Spec {
}
return uriTokens[EXPECTED_NUM_URI_PATH_TOKENS - 2];
}
+
+ /**
+ * Create a {@link FlowConfig} from a {@link Spec}.
+ * The {@link Spec} must have {@link ConfigurationKeys#FLOW_GROUP_KEY} and {@link ConfigurationKeys#FLOW_NAME_KEY} set.
+ * @param spec spec
+ * @return {@link FlowConfig}
+ */
+ public static FlowConfig toFlowConfig(Spec spec) {
+ FlowSpec flowSpec = (FlowSpec) spec;
+ FlowConfig flowConfig = new FlowConfig();
+ Properties flowProps = flowSpec.getConfigAsProperties();
+ Schedule schedule = null;
+
+ if (flowProps.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ schedule = new Schedule();
+ schedule.setCronSchedule(flowProps.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
+ }
+ if (flowProps.containsKey(ConfigurationKeys.JOB_TEMPLATE_PATH)) {
+ flowConfig.setTemplateUris(flowProps.getProperty(ConfigurationKeys.JOB_TEMPLATE_PATH));
+ } else if (flowSpec.getTemplateURIs().isPresent()) {
+ flowConfig.setTemplateUris(StringUtils.join(flowSpec.getTemplateURIs().get(), ","));
+ } else {
+ flowConfig.setTemplateUris("NA");
+ }
+ if (schedule != null) {
+ if (flowProps.containsKey(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)) {
+ schedule.setRunImmediately(Boolean.valueOf(flowProps.getProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY)));
+ }
+
+ flowConfig.setSchedule(schedule);
+ }
+
+ // remove keys that were injected as part of flowSpec creation
+ flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY);
+ flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH);
+
+ StringMap flowPropsAsStringMap = new StringMap();
+ flowPropsAsStringMap.putAll(Maps.fromProperties(flowProps));
+
+ return flowConfig.setId(new FlowId()
+ .setFlowGroup(flowProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY))
+ .setFlowName(flowProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY)))
+ .setProperties(flowPropsAsStringMap);
+ }
}
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
new file mode 100644
index 0000000..ef6737c
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpecSearchObject.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+import java.net.URI;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
+
+import org.apache.gobblin.service.FlowId;
+
+
+/**
+ * This is a class to package all the parameters that should be used to search {@link FlowSpec} in a {@link SpecStore}
+ */
+@Getter
+@Builder
+@ToString
+@AllArgsConstructor
+public class FlowSpecSearchObject implements SpecSearchObject {
+ private final URI flowSpecUri;
+ private final String flowGroup;
+ private final String flowName;
+ private final String templateURI;
+ private final String userToProxy;
+ private final String sourceIdentifier;
+ private final String destinationIdentifier;
+ private final String schedule;
+ private final String modifiedTimestamp;
+ private final Boolean isRunImmediately;
+ private final String owningGroup;
+ private final String propertyFilter;
+
+ public static FlowSpecSearchObject fromFlowId(FlowId flowId) {
+ return FlowSpecSearchObject.builder().flowGroup(flowId.getFlowGroup()).flowName(flowId.getFlowName()).build();
+ }
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
index 442fdc9..e2c206e 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/InstrumentedSpecStore.java
@@ -117,6 +117,18 @@ public abstract class InstrumentedSpecStore implements SpecStore {
}
@Override
+ public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) throws IOException {
+ if (!instrumentationEnabled) {
+ return getSpecsImpl(specSearchObject);
+ } else {
+ long startTimeMillis = System.currentTimeMillis();
+ Collection<Spec> specs = getSpecsImpl(specSearchObject);
+ Instrumented.updateTimer(this.getTimer, System.currentTimeMillis() - startTimeMillis, TimeUnit.MILLISECONDS);
+ return specs;
+ }
+ }
+
+ @Override
public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException {
if (!instrumentationEnabled) {
return updateSpecImpl(spec);
@@ -159,4 +171,9 @@ public abstract class InstrumentedSpecStore implements SpecStore {
public abstract boolean deleteSpecImpl(URI specUri) throws IOException;
public abstract Collection<Spec> getSpecsImpl() throws IOException;
public abstract Iterator<URI> getSpecURIsImpl() throws IOException;
+
+ /** child classes can implement this if they want to get specs using {@link SpecSearchObject} */
+ public Collection<Spec> getSpecsImpl(SpecSearchObject specUri) throws IOException {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
index cc55cc4..ebb88e7 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecCatalog.java
@@ -64,7 +64,14 @@ public interface SpecCatalog extends SpecCatalogListenersContainer, Instrumentab
* Get a {@link Spec} by uri.
* @throws SpecNotFoundException if no such Spec exists
**/
- Spec getSpec(URI uri) throws SpecNotFoundException;
+ Spec getSpecs(URI uri) throws SpecNotFoundException;
+
+ /**
+ * Get a {@link Spec} by {@link SpecSearchObject}.
+ **/
+ default Collection<Spec> getSpecs(SpecSearchObject specSearchObject) {
+ throw new UnsupportedOperationException();
+ }
@Slf4j
class StandardMetrics extends StandardMetricsBridge.StandardMetrics implements SpecCatalogListener {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
new file mode 100644
index 0000000..febf4c9
--- /dev/null
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecSearchObject.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.runtime.api;
+
+/**
+ * This is an interface to package all the parameters that should be used to search {@link Spec} in a {@link SpecStore}
+ */
+public interface SpecSearchObject {
+}
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
index fa8a713..25f8f68 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/SpecStore.java
@@ -17,12 +17,13 @@
package org.apache.gobblin.runtime.api;
-import com.google.common.base.Optional;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
+import com.google.common.base.Optional;
+
public interface SpecStore {
@@ -86,6 +87,13 @@ public interface SpecStore {
Spec getSpec(URI specUri) throws IOException, SpecNotFoundException;
/***
+ * Retrieve {@link Spec}s by {@link SpecSearchObject} from the {@link SpecStore}.
+ * @param specSearchObject {@link SpecSearchObject} for the {@link Spec} to be retrieved.
+ * @throws IOException Exception in retrieving the {@link Spec}.
+ */
+ Collection<Spec> getSpecs(SpecSearchObject specSearchObject) throws IOException;
+
+ /***
* Retrieve specified version of the {@link Spec} by URI from the {@link SpecStore}.
* @param specUri URI for the {@link Spec} to be retrieved.
* @param version Version for the {@link Spec} to be retrieved.
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
index b64ebd7..44d54ea 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java
@@ -53,6 +53,7 @@ import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecStore;
+import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.spec_serde.JavaSpecSerDe;
import org.apache.gobblin.runtime.spec_store.FSSpecStore;
import org.apache.gobblin.service.ServiceConfigKeys;
@@ -269,7 +270,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
@Override
- public Spec getSpec(URI uri) throws SpecNotFoundException {
+ public Spec getSpecs(URI uri) throws SpecNotFoundException {
try {
return specStore.getSpec(uri);
} catch (IOException e) {
@@ -277,15 +278,32 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut
}
}
+ @Override
+ public Collection<Spec> getSpecs(SpecSearchObject specSearchObject) {
+ try {
+ return specStore.getSpecs(specSearchObject);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + specSearchObject, e);
+ }
+ }
+
+ public Collection<Spec> getAllSpecs() {
+ try {
+ return specStore.getSpecs();
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot retrieve all specs from Spec stores", e);
+ }
+ }
+
/**
- * A wrapper of getSpec that handles {@link SpecNotFoundException} properly.
+ * A wrapper of getSpecs that handles {@link SpecNotFoundException} properly.
* This is the most common way to fetch {@link Spec}. For customized way to deal with exception, one will
* need to implement specific catch-block logic.
*/
public Spec getSpecWrapper(URI uri) {
Spec spec = null;
try {
- spec = getSpec(uri);
+ spec = getSpecs(uri);
} catch (SpecNotFoundException snfe) {
log.error(String.format("The URI %s discovered in SpecStore is missing in FlowCatalog"
+ ", suspecting current modification on SpecStore", uri), snfe);
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
index 4814a71..5d86768 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java
@@ -229,7 +229,7 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog,
}
@Override
- public Spec getSpec(URI uri) throws SpecNotFoundException {
+ public Spec getSpecs(URI uri) throws SpecNotFoundException {
try {
return specStore.getSpec(uri);
} catch (IOException e) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
index 65f867e..00cf513 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecDeserializer.java
@@ -17,8 +17,6 @@
package org.apache.gobblin.runtime.spec_serde;
-import java.io.IOException;
-import java.io.StringReader;
import java.lang.reflect.Type;
import java.net.URI;
import java.net.URISyntaxException;
@@ -50,12 +48,7 @@ public class FlowSpecDeserializer implements JsonDeserializer<FlowSpec> {
String description = jsonObject.get(FlowSpecSerializer.FLOW_SPEC_DESCRIPTION_KEY).getAsString();
Config config = ConfigFactory.parseString(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_KEY).getAsString());
- Properties properties = new Properties();
- try {
- properties.load(new StringReader(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY).getAsString()));
- } catch (IOException e) {
- throw new JsonParseException(e);
- }
+ Properties properties = context.deserialize(jsonObject.get(FlowSpecSerializer.FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY), Properties.class);
Set<URI> templateURIs = new HashSet<>();
try {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
index 85a5c34..904634c 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_serde/FlowSpecSerializer.java
@@ -17,15 +17,12 @@
package org.apache.gobblin.runtime.spec_serde;
-import java.io.IOException;
-import java.io.StringWriter;
import java.lang.reflect.Type;
import java.net.URI;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-import com.google.gson.JsonParseException;
import com.google.gson.JsonSerializationContext;
import com.google.gson.JsonSerializer;
import com.typesafe.config.ConfigRenderOptions;
@@ -52,13 +49,7 @@ public class FlowSpecSerializer implements JsonSerializer<FlowSpec> {
flowSpecJson.add(FLOW_SPEC_DESCRIPTION_KEY, context.serialize(src.getDescription()));
flowSpecJson.add(FLOW_SPEC_CONFIG_KEY, context.serialize(src.getConfig().root().render(ConfigRenderOptions.concise())));
- StringWriter writer = new StringWriter();
- try {
- src.getConfigAsProperties().store(writer, "");
- } catch (IOException e) {
- throw new JsonParseException(e);
- }
- flowSpecJson.add(FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY, context.serialize(writer.getBuffer().toString()));
+ flowSpecJson.add(FLOW_SPEC_CONFIG_AS_PROPERTIES_KEY, context.serialize(src.getConfigAsProperties()));
JsonArray templateURIs = new JsonArray();
if (src.getTemplateURIs().isPresent()) {
diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
index a5a0d2b..ff6bc85 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStore.java
@@ -31,6 +31,7 @@ import java.util.List;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
+import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.gson.Gson;
@@ -43,9 +44,11 @@ import org.apache.gobblin.broker.SharedResourcesBrokerFactory;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.MysqlDataSourceFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.runtime.api.InstrumentedSpecStore;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
+import org.apache.gobblin.runtime.api.SpecSearchObject;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
import org.apache.gobblin.runtime.api.SpecStore;
@@ -65,6 +68,7 @@ import static org.apache.gobblin.service.ServiceConfigKeys.FLOW_SOURCE_IDENTIFIE
* but not removing it from {@link SpecStore}.
*/
@Slf4j
+// todo : This should be renamed to MysqlFlowSpecStore, because this implementation only stores FlowSpec, not a TopologySpec
public class MysqlSpecStore extends InstrumentedSpecStore {
public static final String CONFIG_PREFIX = "mysqlSpecStore";
public static final String DEFAULT_TAG_VALUE = "";
@@ -82,7 +86,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
+ "user_to_proxy, source_identifier, destination_identifier, schedule, tag, isRunImmediately, spec, " + NEW_COLUMN + ") "
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE spec = VALUES(spec), " + NEW_COLUMN + " = VALUES(" + NEW_COLUMN + ")";
private static final String DELETE_STATEMENT = "DELETE FROM %s WHERE spec_uri = ?";
- private static final String GET_STATEMENT = "SELECT spec, " + NEW_COLUMN + " FROM %s WHERE spec_uri = ?";
+ private static final String GET_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s WHERE ";
private static final String GET_ALL_STATEMENT = "SELECT spec_uri, spec, " + NEW_COLUMN + " FROM %s";
private static final String GET_ALL_URIS_STATEMENT = "SELECT spec_uri FROM %s";
private static final String GET_ALL_STATEMENT_WITH_TAG = "SELECT spec_uri FROM %s WHERE tag = ?";
@@ -136,7 +140,7 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
public void addSpec(Spec spec, String tagValue) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
- setPreparedStatement(statement, spec, tagValue);
+ setAddPreparedStatement(statement, spec, tagValue);
statement.executeUpdate();
connection.commit();
} catch (SQLException | SpecSerDeException e) {
@@ -176,19 +180,22 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
@Override
public Spec getSpecImpl(URI specUri) throws IOException, SpecNotFoundException {
- try (Connection connection = this.dataSource.getConnection();
- PreparedStatement statement = connection.prepareStatement(String.format(GET_STATEMENT, this.tableName))) {
- statement.setString(1, specUri.toString());
+ Iterator<Spec> specsIterator = getSpecsImpl(FlowSpecSearchObject.builder().flowSpecUri(specUri).build()).iterator();
+ if (specsIterator.hasNext()) {
+ return specsIterator.next();
+ } else {
+ throw new SpecNotFoundException(specUri);
+ }
+ }
- try (ResultSet rs = statement.executeQuery()) {
- if (!rs.next()) {
- throw new SpecNotFoundException(specUri);
- }
- return rs.getString(2) == null
- ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(1).getBinaryStream()))
- : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8));
- }
- } catch (SQLException | SpecSerDeException e) {
+ public Collection<Spec> getSpecsImpl(SpecSearchObject specSearchObject) throws IOException {
+ FlowSpecSearchObject flowSpecSearchObject = (FlowSpecSearchObject) specSearchObject;
+
+ try (Connection connection = this.dataSource.getConnection();
+ PreparedStatement statement = connection.prepareStatement(createGetPreparedStatement(flowSpecSearchObject, this.tableName))) {
+ setGetPreparedStatement(statement, flowSpecSearchObject);
+ return getSpecsInternal(statement);
+ } catch (SQLException e) {
throw new IOException(e);
}
}
@@ -207,25 +214,27 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
public Collection<Spec> getSpecsImpl() throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(GET_ALL_STATEMENT, this.tableName))) {
- List<Spec> specs = new ArrayList<>();
+ return getSpecsInternal(statement);
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+ }
- try (ResultSet rs = statement.executeQuery()) {
- while (rs.next()) {
- try {
- specs.add(
- rs.getString(3) == null
- ? this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
- : this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(3).getBinaryStream()))
- );
- } catch (SQLException | SpecSerDeException e) {
- log.error("Failed to deserialize spec", e);
- }
- }
+ private Collection<Spec> getSpecsInternal(PreparedStatement statement) throws IOException {
+ List<Spec> specs = new ArrayList<>();
+ try (ResultSet rs = statement.executeQuery()) {
+ while (rs.next()) {
+ specs.add(
+ rs.getString(3) == null
+ ? this.specSerDe.deserialize(ByteStreams.toByteArray(rs.getBlob(2).getBinaryStream()))
+ : this.specSerDe.deserialize(rs.getString(2).getBytes(Charsets.UTF_8))
+ );
}
- return specs;
- } catch (SQLException e) {
+ } catch (SQLException | SpecSerDeException e) {
+ log.error("Failed to deserialize spec", e);
throw new IOException(e);
}
+ return specs;
}
@Override
@@ -267,7 +276,129 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
return Optional.of(this.specStoreURI);
}
- protected void setPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
+ /** This expects at least one parameter in {@link FlowSpecSearchObject} to be not null */
+ static String createGetPreparedStatement(FlowSpecSearchObject flowSpecSearchObject, String tableName)
+ throws IOException {
+ String baseStatement = String.format(GET_STATEMENT, tableName);
+ List<String> conditions = new ArrayList<>();
+
+ if (flowSpecSearchObject.getFlowSpecUri() != null) {
+ conditions.add("spec_uri = ?");
+ }
+
+ if (flowSpecSearchObject.getFlowGroup() != null) {
+ conditions.add("flow_group = ?");
+ }
+
+ if (flowSpecSearchObject.getFlowName() != null) {
+ conditions.add("flow_name = ?");
+ }
+
+ if (flowSpecSearchObject.getTemplateURI() != null) {
+ conditions.add("template_uri = ?");
+ }
+
+ if (flowSpecSearchObject.getUserToProxy() != null) {
+ conditions.add("user_to_proxy = ?");
+ }
+
+ if (flowSpecSearchObject.getSourceIdentifier() != null) {
+ conditions.add("source_identifier = ?");
+ }
+
+ if (flowSpecSearchObject.getDestinationIdentifier() != null) {
+ conditions.add("destination_identifier = ?");
+ }
+
+ if (flowSpecSearchObject.getSchedule() != null) {
+ conditions.add("schedule = ?");
+ }
+
+ if (flowSpecSearchObject.getModifiedTimestamp() != null) {
+ conditions.add("modified_time = ?");
+ }
+
+ if (flowSpecSearchObject.getIsRunImmediately() != null) {
+ conditions.add("isRunImmediately = ?");
+ }
+
+ if (flowSpecSearchObject.getOwningGroup() != null) {
+ conditions.add("owning_group = ?");
+ }
+
+ // If the propertyFilter is myKey=myValue, it looks for a config where key is `myKey` and value contains string `myValue`.
+ // If the propertyFilter string does not have `=`, it considers the string as a key and just looks for its existence.
+ // Multiple occurrences of `=` in propertyFilter are not supported and ignored completely.
+ if (flowSpecSearchObject.getPropertyFilter() != null) {
+ String propertyFilter = flowSpecSearchObject.getPropertyFilter();
+ Splitter commaSplitter = Splitter.on(",").trimResults().omitEmptyStrings();
+ for (String property : commaSplitter.splitToList(propertyFilter)) {
+ if (property.contains("=")) {
+ String[] keyValue = property.split("=");
+ if (keyValue.length != 2) {
+ log.error("Incorrect flow config search query");
+ continue;
+ }
+ conditions.add("spec_json->'$.configAsProperties." + keyValue[0] + "' like " + "'%" + keyValue[1] + "%'");
+ } else {
+ conditions.add("spec_json->'$.configAsProperties." + property + "' is not null");
+ }
+ }
+ }
+
+ if (conditions.size() == 0) {
+ throw new IOException("At least one condition is required to query flow configs.");
+ }
+
+ return baseStatement + String.join(" AND ", conditions);
+ }
+
+ private static void setGetPreparedStatement(PreparedStatement statement, FlowSpecSearchObject flowSpecSearchObject)
+ throws SQLException {
+ int i = 0;
+
+ if (flowSpecSearchObject.getFlowSpecUri() != null) {
+ statement.setString(++i, flowSpecSearchObject.getFlowSpecUri().toString());
+ }
+
+ if (flowSpecSearchObject.getFlowGroup() != null) {
+ statement.setString(++i, flowSpecSearchObject.getFlowGroup());
+ }
+
+ if (flowSpecSearchObject.getFlowName() != null) {
+ statement.setString(++i, flowSpecSearchObject.getFlowName());
+ }
+
+ if (flowSpecSearchObject.getTemplateURI() != null) {
+ statement.setString(++i, flowSpecSearchObject.getTemplateURI());
+ }
+
+ if (flowSpecSearchObject.getUserToProxy() != null) {
+ statement.setString(++i, flowSpecSearchObject.getUserToProxy());
+ }
+
+ if (flowSpecSearchObject.getSourceIdentifier() != null) {
+ statement.setString(++i, flowSpecSearchObject.getSourceIdentifier());
+ }
+
+ if (flowSpecSearchObject.getDestinationIdentifier() != null) {
+ statement.setString(++i, flowSpecSearchObject.getDestinationIdentifier());
+ }
+
+ if (flowSpecSearchObject.getSchedule() != null) {
+ statement.setString(++i, flowSpecSearchObject.getModifiedTimestamp());
+ }
+
+ if (flowSpecSearchObject.getIsRunImmediately() != null) {
+ statement.setBoolean(++i, flowSpecSearchObject.getIsRunImmediately());
+ }
+
+ if (flowSpecSearchObject.getOwningGroup() != null) {
+ statement.setString(++i, flowSpecSearchObject.getOwningGroup());
+ }
+ }
+
+ protected void setAddPreparedStatement(PreparedStatement statement, Spec spec, String tagValue) throws SQLException {
FlowSpec flowSpec = (FlowSpec) spec;
URI specUri = flowSpec.getUri();
Config flowConfig = flowSpec.getConfig();
@@ -280,17 +411,18 @@ public class MysqlSpecStore extends InstrumentedSpecStore {
String schedule = ConfigUtils.getString(flowConfig, ConfigurationKeys.JOB_SCHEDULE_KEY, null);
boolean isRunImmediately = ConfigUtils.getBoolean(flowConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, false);
- statement.setString(1, specUri.toString());
- statement.setString(2, flowGroup);
- statement.setString(3, flowName);
- statement.setString(4, templateURI);
- statement.setString(5, userToProxy);
- statement.setString(6, sourceIdentifier);
- statement.setString(7, destinationIdentifier);
- statement.setString(8, schedule);
- statement.setString(9, tagValue);
- statement.setBoolean(10, isRunImmediately);
- statement.setBlob(11, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
- statement.setString(12, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
+ int i = 0;
+ statement.setString(++i, specUri.toString());
+ statement.setString(++i, flowGroup);
+ statement.setString(++i, flowName);
+ statement.setString(++i, templateURI);
+ statement.setString(++i, userToProxy);
+ statement.setString(++i, sourceIdentifier);
+ statement.setString(++i, destinationIdentifier);
+ statement.setString(++i, schedule);
+ statement.setString(++i, tagValue);
+ statement.setBoolean(++i, isRunImmediately);
+ statement.setBlob(++i, new ByteArrayInputStream(this.specSerDe.serialize(flowSpec)));
+ statement.setString(++i, new String(this.specSerDe.serialize(flowSpec), Charsets.UTF_8));
}
}
diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
index 6762f3c..ec6a14e 100644
--- a/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
+++ b/gobblin-runtime/src/test/java/org/apache/gobblin/runtime/spec_store/MysqlSpecStoreTest.java
@@ -17,7 +17,6 @@
package org.apache.gobblin.runtime.spec_store;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@@ -27,16 +26,13 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
-
import java.util.List;
-import java.util.Properties;
import org.apache.commons.lang3.ArrayUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-import com.google.common.base.Charsets;
import com.google.common.collect.Iterators;
import com.typesafe.config.Config;
@@ -45,6 +41,7 @@ import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metastore.testing.ITestMetastoreDatabase;
import org.apache.gobblin.metastore.testing.TestMetastoreDatabaseFactory;
import org.apache.gobblin.runtime.api.FlowSpec;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecSerDe;
import org.apache.gobblin.runtime.api.SpecSerDeException;
@@ -62,9 +59,9 @@ public class MysqlSpecStoreTest {
private MysqlSpecStore specStore;
private MysqlSpecStore oldSpecStore;
- private URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
- private URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
- private URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
+ private final URI uri1 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg1").setFlowGroup("fn1"));
+ private final URI uri2 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg2").setFlowGroup("fn2"));
+ private final URI uri3 = FlowSpec.Utils.createFlowSpecUri(new FlowId().setFlowName("fg3").setFlowGroup("fn3"));
private FlowSpec flowSpec1, flowSpec2, flowSpec3;
public MysqlSpecStoreTest()
@@ -85,13 +82,10 @@ public class MysqlSpecStoreTest {
this.specStore = new MysqlSpecStore(config, new TestSpecSerDe());
this.oldSpecStore = new OldSpecStore(config, new TestSpecSerDe());
- Properties properties = new Properties();
- properties.setProperty(FLOW_SOURCE_IDENTIFIER_KEY, "source");
- properties.setProperty(FLOW_DESTINATION_IDENTIFIER_KEY, "destination");
-
flowSpec1 = FlowSpec.builder(this.uri1)
.withConfig(ConfigBuilder.create()
.addPrimitive("key", "value")
+ .addPrimitive("key3", "value3")
.addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg1")
@@ -100,7 +94,8 @@ public class MysqlSpecStoreTest {
.withVersion("Test version")
.build();
flowSpec2 = FlowSpec.builder(this.uri2)
- .withConfig(ConfigBuilder.create().addPrimitive("key2", "value2")
+ .withConfig(ConfigBuilder.create().addPrimitive("converter", "value1,value2,value3")
+ .addPrimitive("key3", "value3")
.addPrimitive(FLOW_SOURCE_IDENTIFIER_KEY, "source")
.addPrimitive(FLOW_DESTINATION_IDENTIFIER_KEY, "destination")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "fg2")
@@ -119,6 +114,13 @@ public class MysqlSpecStoreTest {
.build();
}
+ @Test(expectedExceptions = IOException.class)
+ public void testSpecSearch() throws Exception {
+ // empty FlowSpecSearchObject should throw an error
+ FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().build();
+ MysqlSpecStore.createGetPreparedStatement(flowSpecSearchObject, "table");
+ }
+
@Test
public void testAddSpec() throws Exception {
this.specStore.addSpec(this.flowSpec1);
@@ -135,15 +137,46 @@ public class MysqlSpecStoreTest {
Assert.assertEquals(result, this.flowSpec1);
Collection<Spec> specs = this.specStore.getSpecs();
+ Assert.assertEquals(specs.size(), 2);
Assert.assertTrue(specs.contains(this.flowSpec1));
Assert.assertTrue(specs.contains(this.flowSpec2));
Iterator<URI> uris = this.specStore.getSpecURIs();
Assert.assertTrue(Iterators.contains(uris, this.uri1));
Assert.assertTrue(Iterators.contains(uris, this.uri2));
+
+ FlowSpecSearchObject flowSpecSearchObject = FlowSpecSearchObject.builder().flowGroup("fg1").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject = FlowSpecSearchObject.builder().flowName("fn2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject = FlowSpecSearchObject.builder().flowName("fg1").flowGroup("fn2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 0);
+
+ flowSpecSearchObject = FlowSpecSearchObject.builder().propertyFilter("key=value").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+
+ flowSpecSearchObject = FlowSpecSearchObject.builder().propertyFilter("converter=value2").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 1);
+ Assert.assertTrue(specs.contains(this.flowSpec2));
+
+ flowSpecSearchObject = FlowSpecSearchObject.builder().propertyFilter("key3").build();
+ specs = this.specStore.getSpecs(flowSpecSearchObject);
+ Assert.assertEquals(specs.size(), 2);
+ Assert.assertTrue(specs.contains(this.flowSpec1));
+ Assert.assertTrue(specs.contains(this.flowSpec2));
}
- @Test
+ @Test (dependsOnMethods = "testGetSpec")
public void testGetSpecWithTag() throws Exception {
//Creating and inserting flowspecs with tags
@@ -186,7 +219,7 @@ public class MysqlSpecStoreTest {
this.specStore.addSpec(this.flowSpec3);
}
- @Test (dependsOnMethods = "testGetSpec")
+ @Test (dependsOnMethods = "testGetSpecWithTag")
public void testDeleteSpec() throws Exception {
this.specStore.deleteSpec(this.uri1);
Assert.assertFalse(this.specStore.exists(this.uri1));
@@ -214,7 +247,7 @@ public class MysqlSpecStoreTest {
public void addSpec(Spec spec, String tagValue) throws IOException {
try (Connection connection = this.dataSource.getConnection();
PreparedStatement statement = connection.prepareStatement(String.format(INSERT_STATEMENT, this.tableName))) {
- setPreparedStatement(statement, spec, tagValue);
+ setAddPreparedStatement(statement, spec, tagValue);
statement.setString(4, null);
statement.executeUpdate();
connection.commit();
diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
index 2be2572..567ab70 100644
--- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
+++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/restli/GobblinServiceFlowConfigResourceHandler.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.restli;
import java.io.IOException;
+import java.util.Collection;
import java.util.Properties;
import java.util.UUID;
@@ -38,6 +39,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.runtime.api.FlowSpecSearchObject;
import org.apache.gobblin.service.FlowConfig;
import org.apache.gobblin.service.FlowConfigLoggedException;
import org.apache.gobblin.service.FlowConfigResourceLocalHandler;
@@ -79,6 +81,16 @@ public class GobblinServiceFlowConfigResourceHandler implements FlowConfigsResou
return this.localHandler.getFlowConfig(flowId);
}
+ @Override
+ public Collection<FlowConfig> getFlowConfig(FlowSpecSearchObject flowSpecSearchObject) throws FlowConfigLoggedException {
+ return this.localHandler.getFlowConfig(flowSpecSearchObject);
+ }
+
+ @Override
+ public Collection<FlowConfig> getAllFlowConfigs() {
+ return this.localHandler.getAllFlowConfigs();
+ }
+
/**
* Adding {@link FlowConfig} should check if current node is active (master).
* If current node is active, call {@link FlowConfigResourceLocalHandler#createFlowConfig(FlowConfig)} directly.
diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
index d1e793f..f1eafbb 100644
--- a/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
+++ b/gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java
@@ -83,7 +83,9 @@ public class GobblinServiceManagerTest {
private static final String TEST_GROUP_NAME = "testGroup";
private static final String TEST_FLOW_NAME = "testFlow";
+ private static final String TEST_FLOW_NAME2 = "testFlow2";
private static final FlowId TEST_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
+ private static final FlowId TEST_FLOW_ID2 = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME2);
private static final FlowId UNCOMPILABLE_FLOW_ID = new FlowId().setFlowGroup(TEST_GROUP_NAME)
.setFlowName(MockedSpecCompiler.UNCOMPILABLE_FLOW);
@@ -337,6 +339,36 @@ public class GobblinServiceManagerTest {
Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1");
}
+ @Test (dependsOnMethods = "testCreateAgain")
+ public void testGetAll() throws Exception {
+ FlowConfig flowConfig2 = new FlowConfig().setId(TEST_FLOW_ID2)
+ .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
+ .setProperties(new StringMap(flowProperties));
+ this.flowConfigClient.createFlowConfig(flowConfig2);
+ Collection<FlowConfig> flowConfigs = this.flowConfigClient.getAllFlowConfigs();
+
+ Assert.assertEquals(flowConfigs.size(), 2);
+
+ this.flowConfigClient.deleteFlowConfig(TEST_FLOW_ID2);
+ }
+
+ @Test (dependsOnMethods = "testCreateAgain", enabled = false)
+ public void testGetFilteredFlows() throws Exception {
+ // Not implemented for FsSpecStore
+
+ Collection<FlowConfig> flowConfigs = this.flowConfigClient.getFlowConfigs(TEST_GROUP_NAME, null, null, null, null, null,
+null, null, null, null);
+ Assert.assertEquals(flowConfigs.size(), 2);
+
+ flowConfigs = this.flowConfigClient.getFlowConfigs(TEST_GROUP_NAME, TEST_FLOW_NAME2, null, null, null, null,
+ null, null, null, null);
+ Assert.assertEquals(flowConfigs.size(), 1);
+
+ flowConfigs = this.flowConfigClient.getFlowConfigs(null, null, null, null, null, null,
+ TEST_SCHEDULE, null, null, null);
+ Assert.assertEquals(flowConfigs.size(), 2);
+ }
+
@Test (dependsOnMethods = "testGet")
public void testUpdate() throws Exception {
FlowId flowId = new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME);
@@ -347,7 +379,7 @@ public class GobblinServiceManagerTest {
flowProperties.put(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY, TEST_SOURCE_NAME);
flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME);
- FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME))
+ FlowConfig flowConfig = new FlowConfig().setId(TEST_FLOW_ID)
.setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE))
.setProperties(new StringMap(flowProperties));