You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:43:37 UTC

[incubator-streampipes] 23/29: [STREAMPIPES-349] Add implementation of endpoint for getting data from data lake

This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit f24f63d82939ee526f88021f52d0b5d1d5f3eb41
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:01:22 2021 +0200

    [STREAMPIPES-349] Add implementation of endpoint for getting data from data lake
---
 .../dataexplorer/DataLakeManagementV4.java         | 51 ++++++++++++++++++++--
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 25 ++++++-----
 2 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 76a7577..8e175b8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -23,9 +23,7 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
+import org.apache.streampipes.dataexplorer.v4.params.*;
 import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -44,6 +42,11 @@ public class DataLakeManagementV4 {
         return allMeasurements;
     }
 
+    public DataResult getData(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
+        Map<String, QueryParamsV4> queryParts = getQueryParams(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval);
+        return new DataExplorerQueryV4(queryParts).executeQuery();
+    }
+
     public boolean removeAllMeasurements() {
         List<DataLakeMeasure> allMeasurements = getAllMeasurements();
 
@@ -104,6 +107,48 @@ public class DataLakeManagementV4 {
         return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
     }
 
+    private Map<String, QueryParamsV4> getQueryParams(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
+        Map<String, QueryParamsV4> queryParts = new HashMap<>();
+
+        queryParts.put("SELECT", SelectFromStatementParams.from(measurementID, aggregationFunction));
+
+        if (startDate != null || endDate != null) {
+            queryParts.put("WHERE", TimeBoundaryParams.from(measurementID, startDate, endDate));
+        }
+
+
+        if (timeInterval != null && aggregationFunction != null) {
+            if (groupBy == null) {
+                queryParts.put("GROUPBYTIME", GroupingByTimeParams.from(measurementID, timeInterval));
+            } else {
+                groupBy = groupBy + ",time(" + timeInterval + ")";
+            }
+        }
+
+        if (groupBy != null) {
+            queryParts.put("GROUPBY", GroupingByTagsParams.from(measurementID, groupBy));
+        }
+
+        if (order != null) {
+            if (order.equals("DESC")) {
+                queryParts.put("DESCENDING", OrderingByTimeParams.from(measurementID, order));
+            }
+        }
+
+        if (limit != null) {
+            queryParts.put("LIMIT", ItemLimitationParams.from(measurementID, limit));
+        }
+
+        if (offset != null) {
+            queryParts.put("OFFSET", OffsetParams.from(measurementID, offset));
+        } else if (limit != null && page != null) {
+            queryParts.put("OFFSET", OffsetParams.from(measurementID, page * limit));
+        }
+
+        return queryParts;
+
+    }
+
     public Map<String, QueryParamsV4> getDeleteQueryParams(String measurementID, Long startDate, Long endDate) {
         Map<String, QueryParamsV4> queryParts = new HashMap<>();
         queryParts.put("DELETE", DeleteFromStatementParams.from(measurementID));
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index cbef9de..fd6ab92 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -95,28 +95,29 @@ public class DataLakeResourceV4 extends AbstractRestResource {
         return ok(allMeasurements);
     }
 
+
     @GET
     @Path("/measurements/{measurementID}")
     @Produces(MediaType.APPLICATION_JSON)
     @Operation(summary = "Get data from a single measurement series by a given id", tags = {"Data Lake"},
             responses = {
                     @ApiResponse(responseCode = "400", description = "Measurement series with given id and requested query specification not found"),
-                    @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = Placeholder.class)))})
+                    @ApiResponse(responseCode = "200", description = "requested data", content = @Content(schema = @Schema(implementation = DataResult.class)))})
     public Response getData(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
             , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID
-            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") String startDate
-            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") String endDate
-            , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") String page
-            , @Parameter(in = ParameterIn.QUERY, description = "items per page limitation for paging operation") @QueryParam("limit") Integer limit
-            , @Parameter(in = ParameterIn.QUERY, description = "offset for paging operation") @QueryParam("offset") Integer offset
+            , @Parameter(in = ParameterIn.QUERY, description = "start date for slicing operation") @QueryParam("startDate") Long startDate
+            , @Parameter(in = ParameterIn.QUERY, description = "end date for slicing operation") @QueryParam("endDate") Long endDate
+            , @Parameter(in = ParameterIn.QUERY, description = "page number for paging operation") @QueryParam("page") Integer page
+            , @Parameter(in = ParameterIn.QUERY, description = "maximum number of retrieved query results") @QueryParam("limit") Integer limit
+            , @Parameter(in = ParameterIn.QUERY, description = "offset") @QueryParam("offset") Integer offset
             , @Parameter(in = ParameterIn.QUERY, description = "grouping tags (comma-separated) for grouping operation") @QueryParam("groupBy") String groupBy
+            , @Parameter(in = ParameterIn.QUERY, description = "ordering of retrieved query results (ASC or DESC - default is ASC)") @QueryParam("order") String order
             , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
-            , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
-            , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json) for data download") @QueryParam("format") String format) {
-        /**
-         * TODO: implementation of method stump
-         */
-        return null;
+            , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval) {
+
+        DataResult result = this.dataLakeManagement.getData(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval);
+
+        return Response.ok(result).build();
     }
 
     @GET