You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/23 16:41:56 UTC
[streampipes] 01/06: [STREAMPIPES-566] Fix query download, improve code structure
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit e3c1c660c945c6498c6c763e2fdc4a88ab0bf08f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Nov 20 16:18:49 2022 +0100
[STREAMPIPES-566] Fix query download, improve code structure
---
.../dataexplorer/DataLakeManagementV4.java | 206 +--------------------
.../dataexplorer/v4/query/QueryResultProvider.java | 64 +++++++
.../v4/query/StreamedQueryResultProvider.java | 113 +++++++++++
.../v4/query/writer/ConfiguredCsvOutputWriter.java | 76 ++++++++
.../query/writer/ConfiguredJsonOutputWriter.java | 70 +++++++
.../v4/query/writer/ConfiguredOutputWriter.java | 53 ++++++
.../dataexplorer/v4/query/writer/OutputFormat.java | 36 ++++
.../v4/query/writer/item/CsvItemWriter.java | 37 ++++
.../v4/query/writer/item/ItemGenerator.java | 57 ++++++
.../v4/query/writer/item/JsonItemWriter.java | 50 +++++
.../dataexplorer/v4/utils/TimeParser.java | 30 +++
.../apache/streampipes/ps/DataLakeResourceV4.java | 3 +-
.../services/data-export.service.ts | 2 +-
13 files changed, 598 insertions(+), 199 deletions(-)
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 9ee04ff03..c6bbc5278 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.dataexplorer;
-import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
@@ -26,11 +25,12 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.dataexplorer.v4.query.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.v4.query.StreamedQueryResultProvider;
import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
import org.apache.streampipes.model.datalake.DataLakeConfiguration;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -50,39 +50,11 @@ import org.lightcouch.CouchDbClient;
import java.io.IOException;
import java.io.OutputStream;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAccessor;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
import java.util.stream.Collectors;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-
public class DataLakeManagementV4 {
- public static final String FOR_ID_KEY = "forId";
-
- private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
- .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
- .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
- .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
- .toFormatter();
-
public List<DataLakeMeasure> getAllMeasurements() {
return DataExplorerUtils.getInfos();
}
@@ -91,163 +63,17 @@ public class DataLakeManagementV4 {
return getDataLakeStorage().findOne(measureId);
}
- private Optional<DataLakeMeasure> findByMeasurementName(String measurementName) {
- return getAllMeasurements()
- .stream()
- .filter(measurement -> measurement.getMeasureName().equals(measurementName))
- .findFirst();
- }
-
- public SpQueryResult getData(ProvidedQueryParams queryParams, boolean ignoreMissingData) throws IllegalArgumentException {
- if (queryParams.has(QP_AUTO_AGGREGATE)) {
- queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
- }
- Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
-
- if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
- int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
- return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
- }
-
- if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
- String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
- return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
- } else {
- return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
- }
+ public SpQueryResult getData(ProvidedQueryParams queryParams,
+ boolean ignoreMissingData) throws IllegalArgumentException {
+ return new QueryResultProvider(queryParams, ignoreMissingData).getData();
}
public void getDataAsStream(ProvidedQueryParams params,
- String format,
+ OutputFormat format,
boolean ignoreMissingValues,
OutputStream outputStream) throws IOException {
- if (!params.has(QP_LIMIT)) {
- params.update(QP_LIMIT, 500000);
- }
-
- var measurement = findByMeasurementName(params.getMeasurementId()).get();
-
- SpQueryResult dataResult;
- //JSON
- if (format.equals("json")) {
-
- Gson gson = new Gson();
- int i = 0;
- if (params.has(QP_PAGE)) {
- i = params.getAsInt(QP_PAGE);
- }
-
- boolean isFirstDataObject = true;
-
- outputStream.write(toBytes("["));
- do {
- params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
- dataResult = getData(params, ignoreMissingValues);
-
-
- if (dataResult.getTotal() > 0) {
- changeTimestampHeader(measurement, dataResult);
-
- for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
- if (!isFirstDataObject) {
- outputStream.write(toBytes(","));
- }
-
- //produce one json object
- boolean isFirstElementInRow = true;
- outputStream.write(toBytes("{"));
- for (int i1 = 0; i1 < row.size(); i1++) {
- Object element = row.get(i1);
- if (!isFirstElementInRow) {
- outputStream.write(toBytes(","));
- }
- isFirstElementInRow = false;
- if (i1 == 0) {
- element = parseTime(element.toString());
- }
- //produce json e.g. "name": "Pipes" or "load": 42
- outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
- + gson.toJson(element)));
- }
- outputStream.write(toBytes("}"));
- isFirstDataObject = false;
- }
- i++;
- }
- } while (dataResult.getTotal() > 0);
- outputStream.write(toBytes("]"));
-
- //CSV
- } else if (format.equals("csv")) {
- int i = 0;
- if (params.has(QP_PAGE)) {
- i = params.getAsInt(QP_PAGE);
- }
-
- boolean isFirstDataObject = true;
- String delimiter = ",";
-
- if (params.has(QP_CSV_DELIMITER)) {
- delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? "," : ";";
- }
-
- do {
- params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
- dataResult = getData(params, ignoreMissingValues);
-
-
- //Send first header
- if (dataResult.getTotal() > 0) {
- changeTimestampHeader(measurement, dataResult);
-
- if (isFirstDataObject) {
- boolean isFirst = true;
- for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
- if (!isFirst) {
- outputStream.write(toBytes(delimiter));
- }
- isFirst = false;
- outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
- }
- }
- outputStream.write(toBytes("\n"));
- isFirstDataObject = false;
- }
-
- if (dataResult.getTotal() > 0) {
- for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
- boolean isFirstInRow = true;
- for (int i1 = 0; i1 < row.size(); i1++) {
- Object element = row.get(i1);
- if (!isFirstInRow) {
- outputStream.write(toBytes(delimiter));
- }
- isFirstInRow = false;
- if (i1 == 0) {
- element = parseTime(element.toString());
- }
- if (element == null) {
- outputStream.write(toBytes(""));
- } else {
- outputStream.write(toBytes(element.toString()));
- }
- }
- outputStream.write(toBytes("\n"));
- }
- }
- i++;
- } while (dataResult.getTotal() > 0);
- }
- }
-
- /**
- * Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
- * @param measurement contains the actual timestamp name value
- * @param dataResult the query result of the database with 'time' as timestamp field name
- */
- private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
- dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+ new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
}
public boolean removeAllMeasurements() {
@@ -345,20 +171,6 @@ public class DataLakeManagementV4 {
return isSuccess;
}
- private byte[] toBytes(String value) {
- return value.getBytes();
- }
-
- private static Long parseTime(String v) {
- TemporalAccessor temporalAccessor = formatter.parseBest(v,
- ZonedDateTime::from,
- LocalDateTime::from,
- LocalDate::from);
-
- Instant instant = Instant.from(temporalAccessor);
- return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
- }
-
public Map<String, Object> getTagValues(String measurementId,
String fields) {
InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
new file mode 100644
index 000000000..34556770f
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
+import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.util.Map;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+
+public class QueryResultProvider {
+
+ public static final String FOR_ID_KEY = "forId";
+
+ protected ProvidedQueryParams queryParams;
+ protected final boolean ignoreMissingData;
+
+ public QueryResultProvider(ProvidedQueryParams queryParams,
+ boolean ignoreMissingData) {
+ this.queryParams = queryParams;
+ this.ignoreMissingData = ignoreMissingData;
+ }
+
+ public SpQueryResult getData() {
+ if (queryParams.has(QP_AUTO_AGGREGATE)) {
+ queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
+ }
+ Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
+
+ if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
+ int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
+ return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
+ }
+
+ if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
+ String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
+ return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
+ } else {
+ return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
+ }
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
new file mode 100644
index 000000000..ada5ee6f5
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
+import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredOutputWriter;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
+
+public class StreamedQueryResultProvider extends QueryResultProvider {
+
+ private static final int MAX_RESULTS_PER_QUERY = 500000;
+
+ private final OutputFormat format;
+
+ public StreamedQueryResultProvider(ProvidedQueryParams params,
+ OutputFormat format,
+ boolean ignoreMissingValues) {
+ super(params, ignoreMissingValues);
+ this.format = format;
+ }
+
+ public void getDataAsStream(OutputStream outputStream) throws IOException {
+ var usesLimit = queryParams.has(QP_LIMIT);
+ var configuredWriter = ConfiguredOutputWriter
+ .getConfiguredWriter(format, queryParams, ignoreMissingData);
+
+ if (!queryParams.has(QP_LIMIT)) {
+ queryParams.update(QP_LIMIT, MAX_RESULTS_PER_QUERY);
+ }
+
+ var limit = queryParams.getAsInt(QP_LIMIT);
+ var measurement = findByMeasurementName(queryParams.getMeasurementId()).get();
+
+ SpQueryResult dataResult;
+ int page = 0;
+ if (queryParams.has(QP_PAGE)) {
+ page = queryParams.getAsInt(QP_PAGE);
+ }
+
+ boolean isFirstDataItem = true;
+ configuredWriter.beforeFirstItem(outputStream);
+ do {
+ queryParams.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(page));
+ dataResult = getData();
+
+ if (dataResult.getTotal() > 0) {
+ changeTimestampHeader(measurement, dataResult);
+ var columns = dataResult.getHeaders();
+ for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
+ configuredWriter.writeItem(outputStream, row, columns, isFirstDataItem);
+ isFirstDataItem = false;
+ }
+ }
+ page++;
+ } while (queryNextPage(dataResult.getTotal(), usesLimit, limit, page));
+ configuredWriter.afterLastItem(outputStream);
+ }
+
+ private boolean queryNextPage(int lastResultsCount,
+ boolean usesLimit,
+ int limit,
+ int lastPage) {
+ if (usesLimit) {
+ return !(limit <= (lastPage) * MAX_RESULTS_PER_QUERY);
+ } else {
+ return lastResultsCount > 0;
+ }
+ }
+
+ private Optional<DataLakeMeasure> findByMeasurementName(String measurementName) {
+ return DataExplorerUtils.getInfos()
+ .stream()
+ .filter(measurement -> measurement.getMeasureName().equals(measurementName))
+ .findFirst();
+ }
+
+ /**
+ * Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
+ * @param measurement contains the actual timestamp name value
+ * @param dataResult the query result of the database with 'time' as timestamp field name
+ */
+ private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
+ dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
new file mode 100644
index 000000000..8d92feb61
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
+
+public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
+
+ private static final String LINE_SEPARATOR = "\n";
+ private static final String COMMA = ",";
+ private static final String SEMICOLON = ";";
+
+ private CsvItemWriter itemWriter;
+ private String delimiter = COMMA;
+
+ @Override
+ public void configure(ProvidedQueryParams params,
+ boolean ignoreMissingValues) {
+ if (params.has(QP_CSV_DELIMITER)) {
+ delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? COMMA : SEMICOLON;
+ }
+ this.itemWriter = new CsvItemWriter(delimiter);
+ }
+
+ @Override
+ public void beforeFirstItem(OutputStream outputStream) {
+ // do nothing
+ }
+
+ @Override
+ public void afterLastItem(OutputStream outputStream) {
+ // do nothing
+ }
+
+ @Override
+ public void writeItem(OutputStream outputStream,
+ List<Object> row,
+ List<String> columnNames,
+ boolean firstObject) throws IOException {
+ if (firstObject) {
+ outputStream.write(toBytes(makeHeaderLine(columnNames)));
+ }
+
+ outputStream.write(toBytes(itemWriter.createItem(row, columnNames) + LINE_SEPARATOR));
+ }
+
+ private String makeHeaderLine(List<String> columns) {
+ StringJoiner joiner = new StringJoiner(this.delimiter);
+ columns.forEach(joiner::add);
+ return joiner + LINE_SEPARATOR;
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
new file mode 100644
index 000000000..7df602960
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.ItemGenerator;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+public class ConfiguredJsonOutputWriter extends ConfiguredOutputWriter {
+
+ private static final String BEGIN_ARRAY = "[";
+ private static final String END_ARRAY = "]";
+
+ private final ItemGenerator jsonObjectWriter;
+
+ public ConfiguredJsonOutputWriter() {
+ Gson gson = new Gson();
+ this.jsonObjectWriter = new JsonItemWriter(gson);
+ }
+
+ @Override
+ public void configure(ProvidedQueryParams params,
+ boolean ignoreMissingValues) {
+ // do nothing
+ }
+
+ @Override
+ public void beforeFirstItem(OutputStream outputStream) throws IOException {
+ outputStream.write(toBytes(BEGIN_ARRAY));
+ }
+
+ @Override
+ public void afterLastItem(OutputStream outputStream) throws IOException {
+ outputStream.write(toBytes(END_ARRAY));
+ }
+
+ @Override
+ public void writeItem(OutputStream outputStream,
+ List<Object> row,
+ List<String> columnNames,
+ boolean firstObject) throws IOException {
+ if (!firstObject) {
+ outputStream.write(toBytes(","));
+ }
+
+ var item = jsonObjectWriter.createItem(row, columnNames);
+ outputStream.write(toBytes(item));
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
new file mode 100644
index 000000000..0dfaa163a
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+public abstract class ConfiguredOutputWriter {
+
+ public static ConfiguredOutputWriter getConfiguredWriter(OutputFormat format,
+ ProvidedQueryParams params,
+ boolean ignoreMissingValues) {
+ var writer = format.getWriter();
+ writer.configure(params, ignoreMissingValues);
+
+ return writer;
+ }
+
+ public abstract void configure(ProvidedQueryParams params,
+ boolean ignoreMissingValues);
+
+ public abstract void beforeFirstItem(OutputStream outputStream) throws IOException;
+
+ public abstract void afterLastItem(OutputStream outputStream) throws IOException;
+
+ public abstract void writeItem(OutputStream outputStream,
+ List<Object> row,
+ List<String> columnNames,
+ boolean firstObject) throws IOException;
+
+ protected byte[] toBytes(String value) {
+ return value.getBytes();
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
new file mode 100644
index 000000000..5a522b403
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import java.util.function.Supplier;
+
+public enum OutputFormat {
+ JSON(ConfiguredJsonOutputWriter::new),
+ CSV(ConfiguredCsvOutputWriter::new);
+
+ private final Supplier<ConfiguredOutputWriter> writerSupplier;
+
+ OutputFormat(Supplier<ConfiguredOutputWriter> writerSupplier) {
+ this.writerSupplier = writerSupplier;
+ }
+
+ public ConfiguredOutputWriter getWriter() {
+ return writerSupplier.get();
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
new file mode 100644
index 000000000..5fe06fb52
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+public class CsvItemWriter extends ItemGenerator {
+
+ public CsvItemWriter(String delimiter) {
+ super(delimiter);
+ }
+
+ @Override
+ protected String makeItemString(String key, Object value) {
+ return value != null ? value.toString() : "";
+ }
+
+ @Override
+ protected String finalizeItem(String item) {
+ return item;
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
new file mode 100644
index 000000000..2e5442f53
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+import org.apache.streampipes.dataexplorer.v4.utils.TimeParser;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+public abstract class ItemGenerator {
+
+ protected static final String COMMA_SEPARATOR = ",";
+
+ private final String separator;
+
+ public ItemGenerator(String separator) {
+ this.separator = separator;
+ }
+
+ public String createItem(List<Object> row,
+ List<String> columns) {
+ StringJoiner joiner = new StringJoiner(separator);
+
+ for (int i = 0; i < row.size(); i++) {
+ var value = row.get(i);
+ if (i == 0) {
+ value = TimeParser.parseTime(value.toString());
+ }
+ joiner.add(makeItemString(columns.get(i), value));
+ }
+
+ return finalizeItem(joiner.toString());
+ }
+
+ protected abstract String makeItemString(String key,
+ Object value);
+
+ protected abstract String finalizeItem(String item);
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
new file mode 100644
index 000000000..c4b8d4525
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+import com.google.gson.Gson;
+
+public class JsonItemWriter extends ItemGenerator {
+
+ private static final String BEGIN_OBJECT = "{";
+ private static final String END_OBJECT = "}";
+
+ private final Gson gson;
+
+ public JsonItemWriter(Gson gson) {
+ super(COMMA_SEPARATOR);
+ this.gson = gson;
+ }
+
+ @Override
+ protected String makeItemString(String key,
+ Object value) {
+ var stringValue = value != null ? gson.toJson(value) : null;
+ return "\""
+ + key
+ + "\": "
+ + stringValue;
+ }
+
+ @Override
+ protected String finalizeItem(String item) {
+ return BEGIN_OBJECT + item + END_OBJECT;
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
new file mode 100644
index 000000000..c7c47267c
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
@@ -0,0 +1,30 @@
+package org.apache.streampipes.dataexplorer.v4.utils;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAccessor;
+
+public class TimeParser {
+
+ private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
+ .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
+ .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
+ .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
+ .toFormatter();
+
+ public static Long parseTime(String v) {
+ TemporalAccessor temporalAccessor = formatter.parseBest(v,
+ ZonedDateTime::from,
+ LocalDateTime::from,
+ LocalDate::from);
+
+ Instant instant = Instant.from(temporalAccessor);
+ return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
+ }
+}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 838cd1aa2..bce7ebed7 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -27,6 +27,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.datalake.DataLakeConfiguration;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -217,7 +218,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
format = "csv";
}
- String outputFormat = format;
+ OutputFormat outputFormat = format.equals("csv") ? OutputFormat.CSV : OutputFormat.JSON;
StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(
sanitizedParams,
outputFormat,
diff --git a/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts b/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
index beda107be..185606f58 100644
--- a/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
+++ b/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
@@ -95,7 +95,7 @@ export class DataExportService {
return this.dataViewQueryGeneratorService
.generateQuery(
exportConfig.dataExportConfig.dateRange.startDate.getTime(),
- exportConfig.dataExportConfig.dateRange.startDate.getTime(),
+ exportConfig.dataExportConfig.dateRange.endDate.getTime(),
dataDownloadDialogModel.dataExplorerDataConfig.sourceConfigs[selectedQueryIndex],
false
);