You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/23 16:41:56 UTC

[streampipes] 01/06: [STREAMPIPES-566] Fix query download, improve code structure

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit e3c1c660c945c6498c6c763e2fdc4a88ab0bf08f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Nov 20 16:18:49 2022 +0100

    [STREAMPIPES-566] Fix query download, improve code structure
---
 .../dataexplorer/DataLakeManagementV4.java         | 206 +--------------------
 .../dataexplorer/v4/query/QueryResultProvider.java |  64 +++++++
 .../v4/query/StreamedQueryResultProvider.java      | 113 +++++++++++
 .../v4/query/writer/ConfiguredCsvOutputWriter.java |  76 ++++++++
 .../query/writer/ConfiguredJsonOutputWriter.java   |  70 +++++++
 .../v4/query/writer/ConfiguredOutputWriter.java    |  53 ++++++
 .../dataexplorer/v4/query/writer/OutputFormat.java |  36 ++++
 .../v4/query/writer/item/CsvItemWriter.java        |  37 ++++
 .../v4/query/writer/item/ItemGenerator.java        |  57 ++++++
 .../v4/query/writer/item/JsonItemWriter.java       |  50 +++++
 .../dataexplorer/v4/utils/TimeParser.java          |  30 +++
 .../apache/streampipes/ps/DataLakeResourceV4.java  |   3 +-
 .../services/data-export.service.ts                |   2 +-
 13 files changed, 598 insertions(+), 199 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 9ee04ff03..c6bbc5278 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.dataexplorer;
 
-import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
@@ -26,11 +25,12 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
 import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
 import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
 import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.dataexplorer.v4.query.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.v4.query.StreamedQueryResultProvider;
 import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -50,39 +50,11 @@ import org.lightcouch.CouchDbClient;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAccessor;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-
 public class DataLakeManagementV4 {
 
-    public static final String FOR_ID_KEY = "forId";
-
-    private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
-            .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
-            .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
-            .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
-            .toFormatter();
-
     public List<DataLakeMeasure> getAllMeasurements() {
         return DataExplorerUtils.getInfos();
     }
@@ -91,163 +63,17 @@ public class DataLakeManagementV4 {
         return getDataLakeStorage().findOne(measureId);
     }
 
-    private Optional<DataLakeMeasure> findByMeasurementName(String measurementName) {
-        return getAllMeasurements()
-            .stream()
-            .filter(measurement -> measurement.getMeasureName().equals(measurementName))
-            .findFirst();
-    }
-
-    public SpQueryResult getData(ProvidedQueryParams queryParams, boolean ignoreMissingData) throws IllegalArgumentException {
-        if (queryParams.has(QP_AUTO_AGGREGATE)) {
-            queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
-        }
-        Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
-
-        if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
-            int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
-            return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
-        }
-
-        if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
-            String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
-            return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
-        } else {
-            return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
-        }
+    public SpQueryResult getData(ProvidedQueryParams queryParams,
+                                 boolean ignoreMissingData) throws IllegalArgumentException {
+        return new QueryResultProvider(queryParams, ignoreMissingData).getData();
     }
 
     public void getDataAsStream(ProvidedQueryParams params,
-                                String format,
+                                OutputFormat format,
                                 boolean ignoreMissingValues,
                                 OutputStream outputStream) throws IOException {
-        if (!params.has(QP_LIMIT)) {
-            params.update(QP_LIMIT, 500000);
-        }
-
-        var measurement = findByMeasurementName(params.getMeasurementId()).get();
-
-        SpQueryResult dataResult;
-        //JSON
-        if (format.equals("json")) {
-
-            Gson gson = new Gson();
-            int i = 0;
-            if (params.has(QP_PAGE)) {
-                i = params.getAsInt(QP_PAGE);
-            }
-
-            boolean isFirstDataObject = true;
-
-            outputStream.write(toBytes("["));
-            do {
-                params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
-                dataResult = getData(params, ignoreMissingValues);
-
-
-                if (dataResult.getTotal() > 0) {
-                    changeTimestampHeader(measurement, dataResult);
-
-                    for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
-                        if (!isFirstDataObject) {
-                            outputStream.write(toBytes(","));
-                        }
-
-                        //produce one json object
-                        boolean isFirstElementInRow = true;
-                        outputStream.write(toBytes("{"));
-                        for (int i1 = 0; i1 < row.size(); i1++) {
-                            Object element = row.get(i1);
-                            if (!isFirstElementInRow) {
-                                outputStream.write(toBytes(","));
-                            }
-                            isFirstElementInRow = false;
-                            if (i1 == 0) {
-                                element = parseTime(element.toString());
-                            }
-                            //produce json e.g. "name": "Pipes" or "load": 42
-                            outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
-                                    + gson.toJson(element)));
-                        }
-                        outputStream.write(toBytes("}"));
-                        isFirstDataObject = false;
-                    }
 
-                    i++;
-                }
-            } while (dataResult.getTotal() > 0);
-            outputStream.write(toBytes("]"));
-
-            //CSV
-        } else if (format.equals("csv")) {
-            int i = 0;
-            if (params.has(QP_PAGE)) {
-                i = params.getAsInt(QP_PAGE);
-            }
-
-            boolean isFirstDataObject = true;
-            String delimiter = ",";
-
-            if (params.has(QP_CSV_DELIMITER)) {
-                delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? "," : ";";
-            }
-
-            do {
-                params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
-                dataResult = getData(params, ignoreMissingValues);
-
-
-                //Send first header
-                if (dataResult.getTotal() > 0) {
-                    changeTimestampHeader(measurement, dataResult);
-
-                    if (isFirstDataObject) {
-                        boolean isFirst = true;
-                        for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
-                            if (!isFirst) {
-                                outputStream.write(toBytes(delimiter));
-                            }
-                            isFirst = false;
-                            outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
-                        }
-                    }
-                    outputStream.write(toBytes("\n"));
-                    isFirstDataObject = false;
-                }
-
-                if (dataResult.getTotal() > 0) {
-                    for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
-                        boolean isFirstInRow = true;
-                        for (int i1 = 0; i1 < row.size(); i1++) {
-                            Object element = row.get(i1);
-                            if (!isFirstInRow) {
-                                outputStream.write(toBytes(delimiter));
-                            }
-                            isFirstInRow = false;
-                            if (i1 == 0) {
-                                element = parseTime(element.toString());
-                            }
-                            if (element == null) {
-                                outputStream.write(toBytes(""));
-                            } else {
-                                outputStream.write(toBytes(element.toString()));
-                            }
-                        }
-                        outputStream.write(toBytes("\n"));
-                    }
-                }
-                i++;
-            } while (dataResult.getTotal() > 0);
-        }
-    }
-
-    /**
-     * Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
-     * @param measurement contains the actual timestamp name value
-     * @param dataResult the query result of the database with 'time' as timestamp field name
-     */
-    private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
-        dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+        new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
     }
 
     public boolean removeAllMeasurements() {
@@ -345,20 +171,6 @@ public class DataLakeManagementV4 {
         return isSuccess;
     }
 
-    private byte[] toBytes(String value) {
-        return value.getBytes();
-    }
-
-    private static Long parseTime(String v) {
-        TemporalAccessor temporalAccessor = formatter.parseBest(v,
-                ZonedDateTime::from,
-                LocalDateTime::from,
-                LocalDate::from);
-
-        Instant instant = Instant.from(temporalAccessor);
-        return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
-    }
-
     public Map<String, Object> getTagValues(String measurementId,
                                             String fields) {
         InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
new file mode 100644
index 000000000..34556770f
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
+import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.util.Map;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+
+public class QueryResultProvider {
+
+  public static final String FOR_ID_KEY = "forId";
+
+  protected ProvidedQueryParams queryParams;
+  protected final boolean ignoreMissingData;
+
+  public QueryResultProvider(ProvidedQueryParams queryParams,
+                             boolean ignoreMissingData) {
+    this.queryParams = queryParams;
+    this.ignoreMissingData = ignoreMissingData;
+  }
+
+  public SpQueryResult getData() {
+    if (queryParams.has(QP_AUTO_AGGREGATE)) {
+      queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
+    }
+    Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
+
+    if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
+      int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
+      return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
+    }
+
+    if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
+      String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
+      return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
+    } else {
+      return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
+    }
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
new file mode 100644
index 000000000..ada5ee6f5
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
+import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredOutputWriter;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
+
+public class StreamedQueryResultProvider extends QueryResultProvider {
+
+  private static final int MAX_RESULTS_PER_QUERY = 500000;
+
+  private final OutputFormat format;
+
+  public StreamedQueryResultProvider(ProvidedQueryParams params,
+                                     OutputFormat format,
+                                     boolean ignoreMissingValues) {
+    super(params, ignoreMissingValues);
+    this.format = format;
+  }
+
+  public void getDataAsStream(OutputStream outputStream) throws IOException {
+    var usesLimit = queryParams.has(QP_LIMIT);
+    var configuredWriter = ConfiguredOutputWriter
+        .getConfiguredWriter(format, queryParams, ignoreMissingData);
+
+    if (!queryParams.has(QP_LIMIT)) {
+      queryParams.update(QP_LIMIT, MAX_RESULTS_PER_QUERY);
+    }
+
+    var limit = queryParams.getAsInt(QP_LIMIT);
+    var measurement = findByMeasurementName(queryParams.getMeasurementId()).get();
+
+    SpQueryResult dataResult;
+    int page = 0;
+    if (queryParams.has(QP_PAGE)) {
+      page = queryParams.getAsInt(QP_PAGE);
+    }
+
+    boolean isFirstDataItem = true;
+    configuredWriter.beforeFirstItem(outputStream);
+    do {
+      queryParams.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(page));
+      dataResult = getData();
+
+      if (dataResult.getTotal() > 0) {
+        changeTimestampHeader(measurement, dataResult);
+        var columns = dataResult.getHeaders();
+        for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
+          configuredWriter.writeItem(outputStream, row, columns, isFirstDataItem);
+          isFirstDataItem = false;
+        }
+      }
+      page++;
+    } while (queryNextPage(dataResult.getTotal(), usesLimit, limit, page));
+    configuredWriter.afterLastItem(outputStream);
+  }
+
+  private boolean queryNextPage(int lastResultsCount,
+                                boolean usesLimit,
+                                int limit,
+                                int lastPage) {
+    if (usesLimit) {
+      return !(limit <= (lastPage) * MAX_RESULTS_PER_QUERY);
+    } else {
+      return lastResultsCount > 0;
+    }
+  }
+
+  private Optional<DataLakeMeasure> findByMeasurementName(String measurementName) {
+    return DataExplorerUtils.getInfos()
+        .stream()
+        .filter(measurement -> measurement.getMeasureName().equals(measurementName))
+        .findFirst();
+  }
+
+  /**
+   * Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
+   * @param measurement contains the actual timestamp name value
+   * @param dataResult the query result of the database with 'time' as timestamp field name
+   */
+  private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
+    dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
new file mode 100644
index 000000000..8d92feb61
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
+
+public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
+
+  private static final String LINE_SEPARATOR = "\n";
+  private static final String COMMA = ",";
+  private static final String SEMICOLON = ";";
+
+  private CsvItemWriter itemWriter;
+  private String delimiter = COMMA;
+
+  @Override
+  public void configure(ProvidedQueryParams params,
+                        boolean ignoreMissingValues) {
+    if (params.has(QP_CSV_DELIMITER)) {
+      delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? COMMA : SEMICOLON;
+    }
+    this.itemWriter = new CsvItemWriter(delimiter);
+  }
+
+  @Override
+  public void beforeFirstItem(OutputStream outputStream) {
+    // do nothing
+  }
+
+  @Override
+  public void afterLastItem(OutputStream outputStream) {
+    // do nothing
+  }
+
+  @Override
+  public void writeItem(OutputStream outputStream,
+                        List<Object> row,
+                        List<String> columnNames,
+                        boolean firstObject) throws IOException {
+    if (firstObject) {
+      outputStream.write(toBytes(makeHeaderLine(columnNames)));
+    }
+
+    outputStream.write(toBytes(itemWriter.createItem(row, columnNames) + LINE_SEPARATOR));
+  }
+
+  private String makeHeaderLine(List<String> columns) {
+    StringJoiner joiner = new StringJoiner(this.delimiter);
+    columns.forEach(joiner::add);
+    return joiner + LINE_SEPARATOR;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
new file mode 100644
index 000000000..7df602960
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.ItemGenerator;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+public class ConfiguredJsonOutputWriter extends ConfiguredOutputWriter {
+
+  private static final String BEGIN_ARRAY = "[";
+  private static final String END_ARRAY = "]";
+
+  private final ItemGenerator jsonObjectWriter;
+
+  public ConfiguredJsonOutputWriter() {
+    Gson gson = new Gson();
+    this.jsonObjectWriter = new JsonItemWriter(gson);
+  }
+
+  @Override
+  public void configure(ProvidedQueryParams params,
+                        boolean ignoreMissingValues) {
+    // do nothing
+  }
+
+  @Override
+  public void beforeFirstItem(OutputStream outputStream) throws IOException {
+    outputStream.write(toBytes(BEGIN_ARRAY));
+  }
+
+  @Override
+  public void afterLastItem(OutputStream outputStream) throws IOException {
+    outputStream.write(toBytes(END_ARRAY));
+  }
+
+  @Override
+  public void writeItem(OutputStream outputStream,
+                        List<Object> row,
+                        List<String> columnNames,
+                        boolean firstObject) throws IOException {
+    if (!firstObject) {
+      outputStream.write(toBytes(","));
+    }
+
+    var item = jsonObjectWriter.createItem(row, columnNames);
+    outputStream.write(toBytes(item));
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
new file mode 100644
index 000000000..0dfaa163a
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+public abstract class ConfiguredOutputWriter {
+
+  public static ConfiguredOutputWriter getConfiguredWriter(OutputFormat format,
+                                                 ProvidedQueryParams params,
+                                                 boolean ignoreMissingValues) {
+    var writer = format.getWriter();
+    writer.configure(params, ignoreMissingValues);
+
+    return writer;
+  }
+
+  public abstract void configure(ProvidedQueryParams params,
+                                 boolean ignoreMissingValues);
+
+  public abstract void beforeFirstItem(OutputStream outputStream) throws IOException;
+
+  public abstract void afterLastItem(OutputStream outputStream) throws IOException;
+
+  public abstract void writeItem(OutputStream outputStream,
+                                 List<Object> row,
+                                 List<String> columnNames,
+                                 boolean firstObject) throws IOException;
+
+  protected byte[] toBytes(String value) {
+    return value.getBytes();
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
new file mode 100644
index 000000000..5a522b403
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import java.util.function.Supplier;
+
+public enum OutputFormat {
+  JSON(ConfiguredJsonOutputWriter::new),
+  CSV(ConfiguredCsvOutputWriter::new);
+
+  private final Supplier<ConfiguredOutputWriter> writerSupplier;
+
+  OutputFormat(Supplier<ConfiguredOutputWriter> writerSupplier) {
+    this.writerSupplier = writerSupplier;
+  }
+
+  public ConfiguredOutputWriter getWriter() {
+    return writerSupplier.get();
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
new file mode 100644
index 000000000..5fe06fb52
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+public class CsvItemWriter extends ItemGenerator {
+
+  public CsvItemWriter(String delimiter) {
+    super(delimiter);
+  }
+
+  @Override
+  protected String makeItemString(String key, Object value) {
+    return value != null ? value.toString() : "";
+  }
+
+  @Override
+  protected String finalizeItem(String item) {
+    return item;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
new file mode 100644
index 000000000..2e5442f53
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+import org.apache.streampipes.dataexplorer.v4.utils.TimeParser;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+public abstract class ItemGenerator {
+
+  protected static final String COMMA_SEPARATOR = ",";
+
+  private final String separator;
+
+  public ItemGenerator(String separator) {
+    this.separator = separator;
+  }
+
+  public String createItem(List<Object> row,
+                           List<String> columns) {
+    StringJoiner joiner = new StringJoiner(separator);
+
+    for (int i = 0; i < row.size(); i++) {
+      var value = row.get(i);
+      if (i == 0) {
+        value = TimeParser.parseTime(value.toString());
+      }
+      joiner.add(makeItemString(columns.get(i), value));
+    }
+
+    return finalizeItem(joiner.toString());
+  }
+
+  protected abstract String makeItemString(String key,
+                                  Object value);
+
+  protected abstract String finalizeItem(String item);
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
new file mode 100644
index 000000000..c4b8d4525
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+import com.google.gson.Gson;
+
+public class JsonItemWriter extends ItemGenerator {
+
+  private static final String BEGIN_OBJECT = "{";
+  private static final String END_OBJECT = "}";
+
+  private final Gson gson;
+
+  public JsonItemWriter(Gson gson) {
+    super(COMMA_SEPARATOR);
+    this.gson = gson;
+  }
+
+  @Override
+  protected String makeItemString(String key,
+                                  Object value) {
+    var stringValue = value != null ? gson.toJson(value) : null;
+    return "\""
+        + key
+        + "\": "
+        + stringValue;
+  }
+
+  @Override
+  protected String finalizeItem(String item) {
+    return BEGIN_OBJECT + item + END_OBJECT;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
new file mode 100644
index 000000000..c7c47267c
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
@@ -0,0 +1,30 @@
+package org.apache.streampipes.dataexplorer.v4.utils;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAccessor;
+
+public class TimeParser {
+
+  private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
+      .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
+      .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
+      .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
+      .toFormatter();
+
+  public static Long parseTime(String v) {
+    TemporalAccessor temporalAccessor = formatter.parseBest(v,
+        ZonedDateTime::from,
+        LocalDateTime::from,
+        LocalDate::from);
+
+    Instant instant = Instant.from(temporalAccessor);
+    return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
+  }
+}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 838cd1aa2..bce7ebed7 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -27,6 +27,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
 import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
 import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
 import org.apache.streampipes.model.StreamPipesErrorMessage;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -217,7 +218,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
                 format = "csv";
             }
 
-            String outputFormat = format;
+            OutputFormat outputFormat = format.equals("csv") ? OutputFormat.CSV : OutputFormat.JSON;
             StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(
                 sanitizedParams,
                 outputFormat,
diff --git a/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts b/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
index beda107be..185606f58 100644
--- a/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
+++ b/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
@@ -95,7 +95,7 @@ export class DataExportService {
     return this.dataViewQueryGeneratorService
       .generateQuery(
         exportConfig.dataExportConfig.dateRange.startDate.getTime(),
-        exportConfig.dataExportConfig.dateRange.startDate.getTime(),
+        exportConfig.dataExportConfig.dateRange.endDate.getTime(),
         dataDownloadDialogModel.dataExplorerDataConfig.sourceConfigs[selectedQueryIndex],
         false
       );