You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:43:39 UTC

[incubator-streampipes] 25/29: [STREAMPIPES-349] Add implementation of endpoint for downloading data from data lake

This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit c9ec353dc3942f38499fd66b7c4bc10431959f56
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:07:26 2021 +0200

    [STREAMPIPES-349] Add implementation of endpoint for downloading data from data lake
---
 .../dataexplorer/DataLakeManagementV4.java         | 123 +++++++++++++++++++++
 .../apache/streampipes/ps/DataLakeResourceV4.java  |  15 ++-
 2 files changed, 135 insertions(+), 3 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 8e175b8..b2b7192 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,6 +18,7 @@
 
 package org.apache.streampipes.dataexplorer;
 
+import com.google.gson.Gson;
 import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
 import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
@@ -31,6 +32,10 @@ import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
 import org.apache.streampipes.model.datalake.DataResult;
 import org.influxdb.dto.QueryResult;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +52,120 @@ public class DataLakeManagementV4 {
         return new DataExplorerQueryV4(queryParts).executeQuery();
     }
 
+    public void getDataAsStream(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval, String format, OutputStream outputStream) throws IOException {
+        if (limit == null) {
+            limit = 500000;
+        }
+        SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
+
+        DataResult dataResult;
+        //JSON
+        if (format.equals("json")) {
+
+            Gson gson = new Gson();
+            int i = 0;
+            if (page != null) {
+                i = page;
+            }
+
+            boolean isFirstDataObject = true;
+
+            outputStream.write(toBytes("["));
+            do {
+                dataResult = getData(measurementID, startDate, endDate, i, limit, null, groupBy, order, aggregationFunction, timeInterval);
+
+                if (dataResult.getTotal() > 0) {
+                    for (List<Object> row : dataResult.getRows()) {
+                        if (!isFirstDataObject) {
+                            outputStream.write(toBytes(","));
+                        }
+
+                        //produce one json object
+                        boolean isFirstElementInRow = true;
+                        outputStream.write(toBytes("{"));
+                        for (int i1 = 0; i1 < row.size(); i1++) {
+                            Object element = row.get(i1);
+                            if (!isFirstElementInRow) {
+                                outputStream.write(toBytes(","));
+                            }
+                            isFirstElementInRow = false;
+                            if (i1 == 0) {
+                                try {
+                                    element = formatter.parse(element.toString()).getTime();
+                                } catch (ParseException e) {
+                                    element = element.toString();
+                                }
+                            }
+                            //produce json e.g. "name": "Pipes" or "load": 42
+                            outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
+                                    + gson.toJson(element)));
+                        }
+                        outputStream.write(toBytes("}"));
+                        isFirstDataObject = false;
+                    }
+
+                    i++;
+                }
+            } while (dataResult.getTotal() > 0);
+            outputStream.write(toBytes("]"));
+
+            //CSV
+        } else if (format.equals("csv")) {
+            int i = 0;
+            if (page != null) {
+                i = page;
+            }
+
+            boolean isFirstDataObject = true;
+
+            do {
+                dataResult = getData(measurementID, startDate, endDate, i, limit, null, groupBy, order, aggregationFunction, timeInterval);
+                //Send first header
+                if (dataResult.getTotal() > 0) {
+                    if (isFirstDataObject) {
+                        boolean isFirst = true;
+                        for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
+                            if (!isFirst) {
+                                outputStream.write(toBytes(";"));
+                            }
+                            isFirst = false;
+                            outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
+                        }
+                    }
+                    outputStream.write(toBytes("\n"));
+                    isFirstDataObject = false;
+                }
+
+                if (dataResult.getTotal() > 0) {
+                    for (List<Object> row : dataResult.getRows()) {
+                        boolean isFirstInRow = true;
+                        for (int i1 = 0; i1 < row.size(); i1++) {
+                            Object element = row.get(i1);
+                            if (!isFirstInRow) {
+                                outputStream.write(toBytes(";"));
+                            }
+                            isFirstInRow = false;
+                            if (i1 == 0) {
+                                try {
+                                    element = formatter.parse(element.toString()).getTime();
+                                } catch (ParseException e) {
+                                    element = element.toString();
+                                }
+                            }
+                            if (element == null) {
+                                outputStream.write(toBytes(""));
+                            } else {
+                                outputStream.write(toBytes(element.toString()));
+                            }
+                        }
+                        outputStream.write(toBytes("\n"));
+                    }
+                }
+                i++;
+            } while (dataResult.getTotal() > 0);
+        }
+    }
+
     public boolean removeAllMeasurements() {
         List<DataLakeMeasure> allMeasurements = getAllMeasurements();
 
@@ -157,4 +276,8 @@ public class DataLakeManagementV4 {
         }
         return queryParts;
     }
+
+    private byte[] toBytes(String value) {
+        return value.getBytes();
+    }
 }
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index caf76e8..f2bd49b 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
 import java.util.List;
 
 class Placeholder {
@@ -139,9 +140,17 @@ public class DataLakeResourceV4 extends AbstractRestResource {
             , @Parameter(in = ParameterIn.QUERY, description = "name of aggregation function used for grouping operation") @QueryParam("aggregationFunction") String aggregationFunction
             , @Parameter(in = ParameterIn.QUERY, description = "time interval for aggregation (e.g. 1m - one minute) for grouping operation") @QueryParam("timeInterval") String timeInterval
             , @Parameter(in = ParameterIn.QUERY, description = "format specification (csv, json - default is csv) for data download") @QueryParam("format") String format) {
-        /**
-         * TODO: implementation of method stump
-         */
+
+        if (format == null) {
+            format = "csv";
+        }
+        String outputFormat = format;
+
+        StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(measurementID, startDate, endDate, page, limit, offset, groupBy, order, aggregationFunction, timeInterval, outputFormat, output);
+
+        return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
+                header("Content-Disposition", "attachment; filename=\"datalake." + outputFormat + "\"")
+                .build();
     }
 
     @GET