You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:43:37 UTC
[incubator-streampipes] 23/29: [STREAMPIPES-349] Add implementation
of endpoint for getting data from data lake
This is an automated email from the ASF dual-hosted git repository.
ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit f24f63d82939ee526f88021f52d0b5d1d5f3eb41
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:01:22 2021 +0200
[STREAMPIPES-349] Add implementation of endpoint for getting data from data lake
---
.../dataexplorer/DataLakeManagementV4.java | 51 ++++++++++++++++++++--
.../apache/streampipes/ps/DataLakeResourceV4.java | 25 ++++++-----
2 files changed, 61 insertions(+), 15 deletions(-)
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 76a7577..8e175b8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -23,9 +23,7 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
+import org.apache.streampipes.dataexplorer.v4.params.*;
import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
import org.apache.streampipes.model.datalake.DataLakeConfiguration;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -44,6 +42,11 @@ public class DataLakeManagementV4 {
return allMeasurements;
}
+ public DataResult getData(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
+ Map<String, QueryParamsV4> queryParts = getQueryParams(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval);
+ return new DataExplorerQueryV4(queryParts).executeQuery();
+ }
+
public boolean removeAllMeasurements() {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
@@ -104,6 +107,48 @@ public class DataLakeManagementV4 {
return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
}
+ private Map<String, QueryParamsV4> getQueryParams(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
+ Map<String, QueryParamsV4> queryParts = new HashMap<>();
+
+ queryParts.put("SELECT", SelectFromStatementParams.from(measurementID, aggregationFunction));
+
+ if (startDate != null || endDate != null) {
+ queryParts.put("WHERE", TimeBoundaryParams.from(measurementID, startDate, endDate));
+ }
+
+
+ if (timeInterval != null && aggregationFunction != null) {
+ if (groupBy == null) {
+ queryParts.put("GROUPBYTIME", GroupingByTimeParams.from(measurementID, timeInterval));
+ } else {
+ groupBy = groupBy + ",time(" + timeInterval + ")";
+ }
+ }
+
+ if (groupBy != null) {
+ queryParts.put("GROUPBY", GroupingByTagsParams.from(measurementID, groupBy));
+ }
+
+ if (order != null) {
+ if (order.equals("DESC")) {
+ queryParts.put("DESCENDING", OrderingByTimeParams.from(measurementID, order));
+ }
+ }
+
+ if (limit != null) {
+ queryParts.put("LIMIT", ItemLimitationParams.from(measurementID, limit));
+ }
+
+ if (offset != null) {
+ queryParts.put("OFFSET", OffsetParams.from(measurementID, offset));
+ } else if (limit != null && page != null) {
+ queryParts.put("OFFSET", OffsetParams.from(measurementID, page * limit));
+ }
+
+ return queryParts;
+
+ }
+
public Map<String, QueryParamsV4> getDeleteQueryParams(String measurementID, Long startDate, Long endDate) {
Map<String, QueryParamsV4> queryParts = new HashMap<>();
queryParts.put("DELETE", DeleteFromStatementParams.from(measurementID));
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index cbef9de..fd6ab92 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -95,28 +95,29 @@ public class DataLakeResourceV4 extends AbstractRestResource {
return ok(allMeasurements);
}
+
@GET
@Path("/measurements/{measurementID}")
@Produces(MediaType.APPLICATION_JSON)
@Operation(summary = "Get data from a single measurement series by a given id", tags = {"Data Lake"},
responses = {
@ApiResponse(responseCode = "400", description = "Measurement series with given id and requested query specification not found"),
- @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = Placeholder.class)))})
+ @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = DataResult.class)))})
public Response getData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
, @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
- , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
- , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate
- , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") String page
- , @Parameter(in = ParameterIn.QUERY, description = "items per page limitation for paging operation") @QueryParam("limit") Integer limit
- , @Parameter(in = ParameterIn.QUERY, description = "offset for paging operation") @QueryParam("offset") Integer offset
+ , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") Long startDate
+ , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") Long endDate
+ , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") Integer page
+ , @Parameter(in = ParameterIn.QUERY, description = "maximum number of retrieved query results") @QueryParam("limit") Integer limit
+ , @Parameter(in = ParameterIn.QUERY, description = "offset") @QueryParam("offset") Integer offset
, @Parameter(in = ParameterIn.QUERY, description = "grouping tags (comma-separated) for grouping operation") @QueryParam("groupBy") String groupBy
+ , @Parameter(in = ParameterIn.QUERY, description = "ordering of retrieved query results (ASC or DESC - default is ASC)") @QueryParam("order") String order
, @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
- , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
- , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json) for data download") @QueryParam("format") String format) {
- /**
- * TODO: implementation of method stump
- */
- return null;
+ , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval) {
+
+ DataResult result = this.dataLakeManagement.getData(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval);
+
+ return Response.ok(result).build();
}
@GET