You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:43:39 UTC
[incubator-streampipes] 25/29: [STREAMPIPES-349] Add implementation
of endpoint for downloading data from data lake
This is an automated email from the ASF dual-hosted git repository.
ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit c9ec353dc3942f38499fd66b7c4bc10431959f56
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:07:26 2021 +0200
[STREAMPIPES-349] Add implementation of endpoint for downloading data from data lake
---
.../dataexplorer/DataLakeManagementV4.java | 123 +++++++++++++++++++++
.../apache/streampipes/ps/DataLakeResourceV4.java | 15 ++-
2 files changed, 135 insertions(+), 3 deletions(-)
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 8e175b8..b2b7192 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.dataexplorer;
+import com.google.gson.Gson;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
@@ -31,6 +32,10 @@ import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
import org.apache.streampipes.model.datalake.DataResult;
import org.influxdb.dto.QueryResult;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -47,6 +52,120 @@ public class DataLakeManagementV4 {
return new DataExplorerQueryV4(queryParts).executeQuery();
}
+ public void getDataAsStream(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval, String format, OutputStream outputStream) throws IOException {
+ if (limit == null) {
+ limit = 500000;
+ }
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+ DataResult dataResult;
+ //JSON
+ if (format.equals("json")) {
+
+ Gson gson = new Gson();
+ int i = 0;
+ if (page != null) {
+ i = page;
+ }
+
+ boolean isFirstDataObject = true;
+
+ outputStream.write(toBytes("["));
+ do {
+ dataResult = getData(measurementID, startDate, endDate, i, limit, null, groupBy, order, aggregationFunction, timeInterval);
+
+ if (dataResult.getTotal() > 0) {
+ for (List<Object> row : dataResult.getRows()) {
+ if (!isFirstDataObject) {
+ outputStream.write(toBytes(","));
+ }
+
+ //produce one json object
+ boolean isFirstElementInRow = true;
+ outputStream.write(toBytes("{"));
+ for (int i1 = 0; i1 < row.size(); i1++) {
+ Object element = row.get(i1);
+ if (!isFirstElementInRow) {
+ outputStream.write(toBytes(","));
+ }
+ isFirstElementInRow = false;
+ if (i1 == 0) {
+ try {
+ element = formatter.parse(element.toString()).getTime();
+ } catch (ParseException e) {
+ element = element.toString();
+ }
+ }
+ //produce json e.g. "name": "Pipes" or "load": 42
+ outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
+ + gson.toJson(element)));
+ }
+ outputStream.write(toBytes("}"));
+ isFirstDataObject = false;
+ }
+
+ i++;
+ }
+ } while (dataResult.getTotal() > 0);
+ outputStream.write(toBytes("]"));
+
+ //CSV
+ } else if (format.equals("csv")) {
+ int i = 0;
+ if (page != null) {
+ i = page;
+ }
+
+ boolean isFirstDataObject = true;
+
+ do {
+ dataResult = getData(measurementID, startDate, endDate, i, limit, null, groupBy, order, aggregationFunction, timeInterval);
+ //Send first header
+ if (dataResult.getTotal() > 0) {
+ if (isFirstDataObject) {
+ boolean isFirst = true;
+ for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
+ if (!isFirst) {
+ outputStream.write(toBytes(";"));
+ }
+ isFirst = false;
+ outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
+ }
+ }
+ outputStream.write(toBytes("\n"));
+ isFirstDataObject = false;
+ }
+
+ if (dataResult.getTotal() > 0) {
+ for (List<Object> row : dataResult.getRows()) {
+ boolean isFirstInRow = true;
+ for (int i1 = 0; i1 < row.size(); i1++) {
+ Object element = row.get(i1);
+ if (!isFirstInRow) {
+ outputStream.write(toBytes(";"));
+ }
+ isFirstInRow = false;
+ if (i1 == 0) {
+ try {
+ element = formatter.parse(element.toString()).getTime();
+ } catch (ParseException e) {
+ element = element.toString();
+ }
+ }
+ if (element == null) {
+ outputStream.write(toBytes(""));
+ } else {
+ outputStream.write(toBytes(element.toString()));
+ }
+ }
+ outputStream.write(toBytes("\n"));
+ }
+ }
+ i++;
+ } while (dataResult.getTotal() > 0);
+ }
+ }
+
public boolean removeAllMeasurements() {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
@@ -157,4 +276,8 @@ public class DataLakeManagementV4 {
}
return queryParts;
}
+
+ private byte[] toBytes(String value) {
+ return value.getBytes();
+ }
}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index caf76e8..f2bd49b 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
import java.util.List;
class Placeholder {
@@ -139,9 +140,17 @@ public class DataLakeResourceV4 extends AbstractRestResource {
, @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
, @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
, @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json - default is csv) for data download") @QueryParam("format") String format) {
- /**
- * TODO: implementation of method stump
- */
+
+ if (format == null) {
+ format = "csv";
+ }
+ String outputFormat = format;
+
+ StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval, outputFormat, output);
+
+ return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
+ header("Content-Disposition", "attachment; filename=\"datalake." + outputFormat + "\"")
+ .build();
}
@GET