You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/11/23 16:41:55 UTC

[streampipes] branch dev updated (d0300e2e5 -> c6ad8b85a)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


    from d0300e2e5 [STREAMPIPES-590] add licence header to resource files
     new e3c1c660c [STREAMPIPES-566] Fix query download, improve code structure
     new 3ae6e5375 [STREAMPIPES-566] Delete unused classes
     new e7123dbdc [STREAMPIPES-566] Add license header
     new 4aeef8b3a [STREAMPIPES-566] Do not use wildcard imports
     new 50af75b11 [STREAMPIPES-566] Add basic tests
     new c6ad8b85a [STREAMPIPES-566] Use time field index over fixed assignment

The 6 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 streampipes-data-explorer/pom.xml                  |   6 +
 .../dataexplorer/DataLakeManagementV4.java         | 199 +--------------------
 .../autoagg/AbstractAutoAggregationQuery.java      |  70 --------
 .../autoagg/FromNowAutoAggregationQuery.java       |  49 -----
 .../autoagg/GroupedAutoAggregationQuery.java       |  48 -----
 .../autoagg/TimeBoundAutoAggregationQuery.java     |  49 -----
 .../param/AggregatedTimeBoundQueryParams.java      |  50 ------
 .../param/AggregatedTimeUnitQueryParams.java       |  51 ------
 .../dataexplorer/param/CountQueryParams.java       |  21 ---
 .../GroupedAggregatedTimeBoundQueryParams.java     |  46 -----
 .../dataexplorer/param/GroupedQueryParams.java     |  42 -----
 .../dataexplorer/param/PagingQueryParams.java      |  95 ----------
 .../dataexplorer/param/TimeBoundQueryParams.java   |  44 -----
 .../dataexplorer/param/TimeUnitQueryParams.java    |  42 -----
 .../query/GetAggregatedEventsFromNowQuery.java     |  47 -----
 .../query/GetAggregatedEventsQuery.java            |  45 -----
 .../query/GetDateFromSortedTableRecord.java        |  69 -------
 .../query/GetEventsAutoAggregationQuery.java       |  21 ---
 .../dataexplorer/query/GetEventsFromNowQuery.java  |  44 -----
 .../dataexplorer/query/GetEventsQuery.java         |  42 -----
 .../query/GetGroupedAggregatedEventsQuery.java     |  44 -----
 .../dataexplorer/query/GetGroupedEventsQuery.java  |  42 -----
 .../query/GetHeadersWithTypesQuery.java            |  46 -----
 .../dataexplorer/query/GetMaxPagesQuery.java       |  41 -----
 .../query/GetNumberOfRecordsByTimeUnitQuery.java   |  52 ------
 .../query/GetNumberOfRecordsQuery.java             |  51 ------
 .../dataexplorer/query/GetPagingEventsQuery.java   |  72 --------
 .../dataexplorer/v4/query/QueryResultProvider.java |  64 +++++++
 .../v4/query/StreamedQueryResultProvider.java      | 118 ++++++++++++
 .../v4/query/writer/ConfiguredCsvOutputWriter.java |  76 ++++++++
 .../query/writer/ConfiguredJsonOutputWriter.java   |  70 ++++++++
 .../v4/query/writer/ConfiguredOutputWriter.java    |  53 ++++++
 .../dataexplorer/v4/query/writer/OutputFormat.java |  17 +-
 .../v4/query/writer/item/CsvItemWriter.java        |  23 +--
 .../v4/query/writer/item/ItemGenerator.java        |  57 ++++++
 .../v4/query/writer/item/JsonItemWriter.java       |  37 ++--
 .../dataexplorer/v4/utils/TimeParser.java          |  48 +++++
 .../writer/TestConfiguredCsvOutputWriter.java      |  52 ++++++
 .../writer/TestConfiguredJsonOutputWriter.java     |  53 ++++++
 .../query/writer/TestConfiguredOutputWriter.java   |  28 ++-
 .../v4/query/writer/item/TestCsvItemWriter.java    |  31 ++--
 .../v4/query/writer/item/TestItemWriter.java       |  28 ++-
 .../v4/query/writer/item/TestJsonItemWriter.java   |  24 +--
 .../apache/streampipes/ps/DataLakeResourceV4.java  |   3 +-
 .../services/data-export.service.ts                |   2 +-
 45 files changed, 711 insertions(+), 1501 deletions(-)
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/AbstractAutoAggregationQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/FromNowAutoAggregationQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/GroupedAutoAggregationQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/TimeBoundAutoAggregationQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeBoundQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeUnitQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/CountQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedAggregatedTimeBoundQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/PagingQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeBoundQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeUnitQueryParams.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsFromNowQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetDateFromSortedTableRecord.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsAutoAggregationQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsFromNowQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedAggregatedEventsQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedEventsQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetHeadersWithTypesQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetMaxPagesQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsByTimeUnitQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsQuery.java
 delete mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetPagingEventsQuery.java
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
 copy streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/connect/AbstractAdapterResource.java => streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java (66%)
 copy streampipes-integration-tests/src/test/java/org/apache/streampipes/integration/containers/PulsarDevContainer.java => streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java (69%)
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
 copy streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/utils/DummyCollector.java => streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java (55%)
 create mode 100644 streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
 create mode 100644 streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
 create mode 100644 streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
 copy ui/src/app/connect/components/adapter-details/adapter-details-tabs.ts => streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java (65%)
 copy streampipes-pipeline-management/src/test/java/org/apache/streampipes/manager/selector/TestSelectorGenerator.java => streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java (55%)
 copy streampipes-sdk/src/main/java/org/apache/streampipes/sdk/helpers/Filetypes.java => streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java (68%)
 copy streampipes-extensions/streampipes-processors-transformation-flink/src/test/java/org/apache/streampipes/processors/transformation/flink/processor/hasher/TestFieldHasherController.java => streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java (60%)


[streampipes] 04/06: [STREAMPIPES-566] Do not use wildcard imports

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 4aeef8b3ac98ef438e2573b2e69e39ed5843edb8
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Nov 21 20:21:16 2022 +0100

    [STREAMPIPES-566] Do not use wildcard imports
---
 .../org/apache/streampipes/dataexplorer/DataLakeManagementV4.java  | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index c6bbc5278..11ebdefb3 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -50,7 +50,12 @@ import org.lightcouch.CouchDbClient;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 
 public class DataLakeManagementV4 {


[streampipes] 02/06: [STREAMPIPES-566] Delete unused classes

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 3ae6e53751864155cb89173b1bb90d57009c126d
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Nov 20 16:26:34 2022 +0100

    [STREAMPIPES-566] Delete unused classes
---
 .../autoagg/AbstractAutoAggregationQuery.java      | 70 ----------------
 .../autoagg/FromNowAutoAggregationQuery.java       | 49 -----------
 .../autoagg/GroupedAutoAggregationQuery.java       | 48 -----------
 .../autoagg/TimeBoundAutoAggregationQuery.java     | 49 -----------
 .../param/AggregatedTimeBoundQueryParams.java      | 50 ------------
 .../param/AggregatedTimeUnitQueryParams.java       | 51 ------------
 .../dataexplorer/param/CountQueryParams.java       | 21 -----
 .../GroupedAggregatedTimeBoundQueryParams.java     | 46 -----------
 .../dataexplorer/param/GroupedQueryParams.java     | 42 ----------
 .../dataexplorer/param/PagingQueryParams.java      | 95 ----------------------
 .../dataexplorer/param/TimeBoundQueryParams.java   | 44 ----------
 .../dataexplorer/param/TimeUnitQueryParams.java    | 42 ----------
 .../query/GetAggregatedEventsFromNowQuery.java     | 47 -----------
 .../query/GetAggregatedEventsQuery.java            | 45 ----------
 .../query/GetDateFromSortedTableRecord.java        | 69 ----------------
 .../query/GetEventsAutoAggregationQuery.java       | 21 -----
 .../dataexplorer/query/GetEventsFromNowQuery.java  | 44 ----------
 .../dataexplorer/query/GetEventsQuery.java         | 42 ----------
 .../query/GetGroupedAggregatedEventsQuery.java     | 44 ----------
 .../dataexplorer/query/GetGroupedEventsQuery.java  | 42 ----------
 .../query/GetHeadersWithTypesQuery.java            | 46 -----------
 .../dataexplorer/query/GetMaxPagesQuery.java       | 41 ----------
 .../query/GetNumberOfRecordsByTimeUnitQuery.java   | 52 ------------
 .../query/GetNumberOfRecordsQuery.java             | 51 ------------
 .../dataexplorer/query/GetPagingEventsQuery.java   | 72 ----------------
 25 files changed, 1223 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/AbstractAutoAggregationQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/AbstractAutoAggregationQuery.java
deleted file mode 100644
index 90c7d9b31..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/AbstractAutoAggregationQuery.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.autoagg;
-
-import org.apache.streampipes.dataexplorer.model.Order;
-import org.apache.streampipes.dataexplorer.param.QueryParams;
-import org.apache.streampipes.dataexplorer.query.GetDateFromSortedTableRecord;
-
-import java.util.function.Supplier;
-
-public abstract class AbstractAutoAggregationQuery<Q extends QueryParams, OUT> {
-
-  private static final double NUM_OF_AUTO_AGGREGATION_VALUES = 2000;
-
-  protected Q params;
-  private Supplier<OUT> supplier;
-
-  public AbstractAutoAggregationQuery(Q params, Supplier<OUT> supplier) {
-    this.params = params;
-    this.supplier = supplier;
-  }
-
-  public OUT executeQuery() {
-    double numberOfRecords = getCount();
-
-    if (numberOfRecords == 0) {
-      return supplier.get();
-    } else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
-      return getRawEvents();
-    } else {
-      int aggregationValue = getAggregationValue(params.getIndex());
-      return getAggregatedEvents(aggregationValue);
-    }
-  }
-
-  protected abstract double getCount();
-
-  protected abstract OUT getRawEvents();
-
-  protected abstract OUT getAggregatedEvents(Integer aggregationValue);
-
-  private int getAggregationValue(String index) {
-    long timerange = getDateFromNewestRecordOfTable(index) - getDateFromOldestRecordOfTable(index);
-    double v = timerange / NUM_OF_AUTO_AGGREGATION_VALUES;
-    return Double.valueOf(v).intValue();
-  }
-
-  private long getDateFromNewestRecordOfTable(String index) {
-    return new GetDateFromSortedTableRecord(QueryParams.from(index), Order.DESC).executeQuery();
-  }
-
-  private long getDateFromOldestRecordOfTable(String index) {
-    return new GetDateFromSortedTableRecord(QueryParams.from(index), Order.ASC).executeQuery();
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/FromNowAutoAggregationQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/FromNowAutoAggregationQuery.java
deleted file mode 100644
index 8c0721edc..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/FromNowAutoAggregationQuery.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.autoagg;
-
-import org.apache.streampipes.dataexplorer.param.AggregatedTimeUnitQueryParams;
-import org.apache.streampipes.dataexplorer.param.TimeUnitQueryParams;
-import org.apache.streampipes.dataexplorer.query.GetAggregatedEventsFromNowQuery;
-import org.apache.streampipes.dataexplorer.query.GetEventsFromNowQuery;
-import org.apache.streampipes.dataexplorer.query.GetNumberOfRecordsByTimeUnitQuery;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-
-public class FromNowAutoAggregationQuery extends AbstractAutoAggregationQuery<TimeUnitQueryParams, SpQueryResult> {
-
-  public FromNowAutoAggregationQuery(TimeUnitQueryParams params) {
-    super(params, SpQueryResult::new);
-  }
-
-  @Override
-  protected double getCount() {
-    return new GetNumberOfRecordsByTimeUnitQuery(params).executeQuery();
-  }
-
-  @Override
-  protected SpQueryResult getRawEvents() {
-    return new GetEventsFromNowQuery(params).executeQuery();
-  }
-
-  @Override
-  protected SpQueryResult getAggregatedEvents(Integer aggregationValue) {
-    return new GetAggregatedEventsFromNowQuery(AggregatedTimeUnitQueryParams
-            .from(params.getIndex(), params.getTimeUnit(), params.getTimeValue(), "ms", aggregationValue))
-            .executeQuery();
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/GroupedAutoAggregationQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/GroupedAutoAggregationQuery.java
deleted file mode 100644
index 7518f053d..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/GroupedAutoAggregationQuery.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.autoagg;
-
-import org.apache.streampipes.dataexplorer.param.GroupedAggregatedTimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.param.GroupedQueryParams;
-import org.apache.streampipes.dataexplorer.query.GetGroupedAggregatedEventsQuery;
-import org.apache.streampipes.dataexplorer.query.GetGroupedEventsQuery;
-import org.apache.streampipes.dataexplorer.query.GetNumberOfRecordsQuery;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-
-public class GroupedAutoAggregationQuery extends AbstractAutoAggregationQuery<GroupedQueryParams, SpQueryResult> {
-
-  public GroupedAutoAggregationQuery(GroupedQueryParams params) {
-    super(params, SpQueryResult::new);
-  }
-
-  @Override
-  protected double getCount() {
-    return new GetNumberOfRecordsQuery(params).executeQuery();
-  }
-
-  @Override
-  protected SpQueryResult getRawEvents() {
-    return new GetGroupedEventsQuery(params).executeQuery();
-  }
-
-  @Override
-  protected SpQueryResult getAggregatedEvents(Integer aggregationValue) {
-    return new GetGroupedAggregatedEventsQuery(GroupedAggregatedTimeBoundQueryParams.from(params.getIndex(),
-            params.getStartDate(), params.getEndDate(), "ms", aggregationValue, params.getGroupingTag())).executeQuery();
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/TimeBoundAutoAggregationQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/TimeBoundAutoAggregationQuery.java
deleted file mode 100644
index 5cc089a97..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/autoagg/TimeBoundAutoAggregationQuery.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.autoagg;
-
-import org.apache.streampipes.dataexplorer.param.AggregatedTimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.param.TimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.query.GetAggregatedEventsQuery;
-import org.apache.streampipes.dataexplorer.query.GetEventsQuery;
-import org.apache.streampipes.dataexplorer.query.GetNumberOfRecordsQuery;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-
-public class TimeBoundAutoAggregationQuery extends AbstractAutoAggregationQuery<TimeBoundQueryParams, SpQueryResult> {
-
-  public TimeBoundAutoAggregationQuery(TimeBoundQueryParams params) {
-    super(params, SpQueryResult::new);
-  }
-
-  @Override
-  protected double getCount() {
-    return new GetNumberOfRecordsQuery(params).executeQuery();
-  }
-
-  @Override
-  protected SpQueryResult getRawEvents() {
-    return new GetEventsQuery(params).executeQuery();
-  }
-
-  @Override
-  protected SpQueryResult getAggregatedEvents(Integer aggregationValue) {
-    return new GetAggregatedEventsQuery(AggregatedTimeBoundQueryParams.from(params.getIndex(),
-            params.getStartDate(), params.getEndDate(), "ms", aggregationValue))
-            .executeQuery();
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeBoundQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeBoundQueryParams.java
deleted file mode 100644
index 274f42851..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeBoundQueryParams.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class AggregatedTimeBoundQueryParams extends TimeBoundQueryParams {
-
-  private String aggregationUnit;
-  private int aggregationValue;
-
-  public static AggregatedTimeBoundQueryParams from(String index,
-                                                    long startDate,
-                                                    long endDate,
-                                                    String aggregationUnit,
-                                                    int aggregationValue) {
-    return new AggregatedTimeBoundQueryParams(index, startDate, endDate, aggregationUnit, aggregationValue);
-  }
-
-  protected AggregatedTimeBoundQueryParams(String index,
-                        long startDate,
-                        long endDate,
-                        String aggregationUnit,
-                        int aggregationValue) {
-    super(index, startDate, endDate);
-    this.aggregationUnit = aggregationUnit;
-    this.aggregationValue = aggregationValue;
-  }
-
-  public String getAggregationUnit() {
-    return aggregationUnit;
-  }
-
-  public int getAggregationValue() {
-    return aggregationValue;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeUnitQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeUnitQueryParams.java
deleted file mode 100644
index 770c555a8..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/AggregatedTimeUnitQueryParams.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class AggregatedTimeUnitQueryParams extends TimeUnitQueryParams {
-
-  private String aggregationUnit;
-  private Integer aggregationValue;
-
-  public static AggregatedTimeUnitQueryParams from(String index,
-                                                   String timeUnit,
-                                                   Integer timeValue,
-                                                   String aggregationUnit,
-                                                   Integer aggregationValue) {
-    return new AggregatedTimeUnitQueryParams(index, timeUnit, timeValue, aggregationUnit, aggregationValue);
-  }
-
-  protected AggregatedTimeUnitQueryParams(String index,
-                                          String timeUnit,
-                                          Integer timeValue,
-                                          String aggregationUnit,
-                                          Integer aggregationValue) {
-    super(index, timeUnit, timeValue);
-    this.aggregationUnit = aggregationUnit;
-    this.aggregationValue = aggregationValue;
-  }
-
-  public String getAggregationUnit() {
-    return aggregationUnit;
-  }
-
-  public Integer getAggregationValue() {
-    return aggregationValue;
-  }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/CountQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/CountQueryParams.java
deleted file mode 100644
index a8a0971be..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/CountQueryParams.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class CountQueryParams {
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedAggregatedTimeBoundQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedAggregatedTimeBoundQueryParams.java
deleted file mode 100644
index e9bd14814..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedAggregatedTimeBoundQueryParams.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class GroupedAggregatedTimeBoundQueryParams extends AggregatedTimeBoundQueryParams {
-
-  private final String groupingTag;
-
-  public static GroupedAggregatedTimeBoundQueryParams from(String index,
-                                        long startDate,
-                                        long endDate,
-                                        String aggregationUnit,
-                                        int aggregationValue,
-                                        String groupingTag) {
-    return new GroupedAggregatedTimeBoundQueryParams(index, startDate, endDate, aggregationUnit, aggregationValue, groupingTag);
-  }
-
-  protected GroupedAggregatedTimeBoundQueryParams(String index,
-                               long startDate,
-                               long endDate,
-                               String aggregationUnit,
-                               int aggregationValue,
-                               String groupingTag) {
-    super(index, startDate, endDate, aggregationUnit, aggregationValue);
-    this.groupingTag = groupingTag;
-  }
-
-  public String getGroupingTag() {
-    return groupingTag;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedQueryParams.java
deleted file mode 100644
index 2669a75d9..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/GroupedQueryParams.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class GroupedQueryParams extends TimeBoundQueryParams {
-
-  private final String groupingTag;
-
-  public static GroupedQueryParams from(String index,
-                                        long startDate,
-                                        long endDate,
-                                        String groupingTag) {
-    return new GroupedQueryParams(index, startDate, endDate, groupingTag);
-  }
-
-  protected GroupedQueryParams(String index,
-                               long startDate,
-                               long endDate,
-                               String groupingTag) {
-    super(index, startDate, endDate);
-    this.groupingTag = groupingTag;
-  }
-
-  public String getGroupingTag() {
-    return groupingTag;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/PagingQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/PagingQueryParams.java
deleted file mode 100644
index 7ecf444ba..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/PagingQueryParams.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-import javax.annotation.Nullable;
-
-public class PagingQueryParams extends QueryParams {
-
-  private final Integer itemsPerPage;
-  private Integer page;
-
-  private long startDate;
-  private long endDate;
-
-  private boolean filterByDate;
-
-  public static PagingQueryParams from(String index, Integer itemsPerPage) {
-    return new PagingQueryParams(index, itemsPerPage);
-  }
-
-  public static PagingQueryParams from(String index, Integer itemsPerPage, Integer page) {
-    return new PagingQueryParams(index, itemsPerPage, page);
-  }
-
-  public static PagingQueryParams from(String index,
-                                       Integer itemsPerPage,
-                                       Integer page,
-                                       @Nullable Long startDate,
-                                       @Nullable Long endDate) {
-    return new PagingQueryParams(index, itemsPerPage, page, startDate, endDate);
-  }
-
-  protected PagingQueryParams(String index, Integer itemsPerPage) {
-    super(index);
-    this.itemsPerPage = itemsPerPage;
-    this.filterByDate = false;
-  }
-
-  protected PagingQueryParams(String index, Integer itemsPerPage, Integer page) {
-    super(index);
-    this.itemsPerPage = itemsPerPage;
-    this.page = page;
-    this.filterByDate = false;
-  }
-
-  protected PagingQueryParams(String index,
-                              Integer itemsPerPage,
-                              Integer page,
-                              @Nullable Long startDate,
-                              @Nullable Long endDate) {
-    this(index, itemsPerPage, page);
-    if (startDate == null || endDate == null) {
-      this.filterByDate = false;
-    } else {
-      this.startDate = startDate;
-      this.endDate = endDate;
-      this.filterByDate = true;
-    }
-  }
-
-  public Integer getItemsPerPage() {
-    return itemsPerPage;
-  }
-
-  public Integer getPage() {
-    return page;
-  }
-
-  public long getStartDate() {
-    return startDate;
-  }
-
-  public long getEndDate() {
-    return endDate;
-  }
-
-  public boolean isFilterByDate() {
-    return filterByDate;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeBoundQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeBoundQueryParams.java
deleted file mode 100644
index 4ee3289e5..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeBoundQueryParams.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class TimeBoundQueryParams extends QueryParams {
-
-  private final long startDate;
-  private final long endDate;
-
-  public static TimeBoundQueryParams from(String index, long startDate, long endDate) {
-    return new TimeBoundQueryParams(index, startDate, endDate);
-  }
-
-  protected TimeBoundQueryParams(String index,
-                        long startDate,
-                        long endDate) {
-    super(index);
-    this.startDate = startDate;
-    this.endDate = endDate;
-  }
-
-  public long getStartDate() {
-    return startDate;
-  }
-
-  public long getEndDate() {
-    return endDate;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeUnitQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeUnitQueryParams.java
deleted file mode 100644
index 7ae568b29..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/TimeUnitQueryParams.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.param;
-
-public class TimeUnitQueryParams extends QueryParams {
-
-  private final String timeUnit;
-  private final Integer timeValue;
-
-  public static TimeUnitQueryParams from(String index, String timeUnit, Integer timeValue) {
-    return new TimeUnitQueryParams(index, timeUnit, timeValue);
-  }
-
-  protected TimeUnitQueryParams(String index, String timeUnit, Integer timeValue) {
-    super(index);
-    this.timeUnit = timeUnit;
-    this.timeValue = timeValue;
-  }
-
-  public String getTimeUnit() {
-    return timeUnit;
-  }
-
-  public Integer getTimeValue() {
-    return timeValue;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsFromNowQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsFromNowQuery.java
deleted file mode 100644
index 9c171a683..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsFromNowQuery.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.AggregatedTimeUnitQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.influxdb.dto.QueryResult;
-
-public class GetAggregatedEventsFromNowQuery extends ParameterizedDataExplorerQuery<AggregatedTimeUnitQueryParams, SpQueryResult> {
-
-  public GetAggregatedEventsFromNowQuery(AggregatedTimeUnitQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectMeanFrom(params.getIndex()));
-    queryBuilder.add("WHERE time > now() -"
-            + params.getTimeValue()
-            + params.getTimeUnit()
-            + " GROUP BY time("
-            + params.getAggregationValue()
-            + params.getAggregationUnit()
-            + ") fill(none) ORDER BY time");
-  }
-
-  @Override
-  protected SpQueryResult postQuery(QueryResult result) {
-    return convertResult(result);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsQuery.java
deleted file mode 100644
index 97e0c31f5..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetAggregatedEventsQuery.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.AggregatedTimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.influxdb.dto.QueryResult;
-
-public class GetAggregatedEventsQuery extends ParameterizedDataExplorerQuery<AggregatedTimeBoundQueryParams, SpQueryResult> {
-
-  public GetAggregatedEventsQuery(AggregatedTimeBoundQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectMeanFrom(params.getIndex()));
-    queryBuilder.add(" WHERE time > " + params.getStartDate() * 1000000
-            + " AND time < " + params.getEndDate() * 1000000);
-    queryBuilder.add("GROUP BY time(" + params.getAggregationValue() + params.getAggregationUnit() + ")");
-    queryBuilder.add("fill(none)");
-    queryBuilder.add("ORDER BY time");
-  }
-
-  @Override
-  protected SpQueryResult postQuery(QueryResult result) {
-    return convertResult(result);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetDateFromSortedTableRecord.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetDateFromSortedTableRecord.java
deleted file mode 100644
index e5d97b4f6..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetDateFromSortedTableRecord.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.model.Order;
-import org.apache.streampipes.dataexplorer.param.QueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.influxdb.dto.QueryResult;
-
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-public class GetDateFromSortedTableRecord extends ParameterizedDataExplorerQuery<QueryParams, Long> {
-
-  private SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
-  private SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
-
-  private Order order;
-
-  public GetDateFromSortedTableRecord(QueryParams queryParams, Order order) {
-    super(queryParams);
-    this.order = order;
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectWildcardFrom(params.getIndex()));
-    queryBuilder.add("ORDER BY "
-            + order.toValue()
-            + " LIMIT 1");
-  }
-
-  @Override
-  protected Long postQuery(QueryResult result) throws RuntimeException {
-    int timestampIndex = result.getResults().get(0).getSeries().get(0).getColumns().indexOf("time");
-    String stringDate = result.getResults().get(0).getSeries().get(0).getValues().get(0).get(timestampIndex).toString();
-
-    try {
-      Date date = tryParseDate(stringDate);
-      return date.getTime();
-    } catch (ParseException e) {
-      throw new RuntimeException("Could not parse date");
-    }
-  }
-
-  private Date tryParseDate(String v) throws ParseException {
-    try {
-      return dateFormat1.parse(v);
-    } catch (ParseException e) {
-      return dateFormat2.parse(v);
-    }
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsAutoAggregationQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsAutoAggregationQuery.java
deleted file mode 100644
index 8706e3dd4..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsAutoAggregationQuery.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-public class GetEventsAutoAggregationQuery {
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsFromNowQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsFromNowQuery.java
deleted file mode 100644
index 2f29d75cb..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsFromNowQuery.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.TimeUnitQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.influxdb.dto.QueryResult;
-
-public class GetEventsFromNowQuery extends ParameterizedDataExplorerQuery<TimeUnitQueryParams, SpQueryResult> {
-
-  public GetEventsFromNowQuery(TimeUnitQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectWildcardFrom(params.getIndex()));
-    queryBuilder.add("WHERE time > now() -"
-            + params.getTimeValue()
-            + params.getTimeUnit()
-            + " ORDER BY time");
-  }
-
-  @Override
-  protected SpQueryResult postQuery(QueryResult result) {
-    return convertResult(result);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsQuery.java
deleted file mode 100644
index 2f4ccd254..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetEventsQuery.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.TimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.influxdb.dto.QueryResult;
-
-public class GetEventsQuery extends ParameterizedDataExplorerQuery<TimeBoundQueryParams, SpQueryResult> {
-
-  public GetEventsQuery(TimeBoundQueryParams params) {
-    super(params);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectWildcardFrom(params.getIndex()));
-    queryBuilder.add(QueryTemplates.whereTimeWithin(params.getStartDate(), params.getEndDate()));
-    queryBuilder.add("ORDER BY time");
-  }
-
-  @Override
-  protected SpQueryResult postQuery(QueryResult result) {
-    return convertResult(result);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedAggregatedEventsQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedAggregatedEventsQuery.java
deleted file mode 100644
index 81c77f4d8..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedAggregatedEventsQuery.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.GroupedAggregatedTimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-
-public class GetGroupedAggregatedEventsQuery extends ParameterizedDataExplorerQuery<GroupedAggregatedTimeBoundQueryParams, SpQueryResult> {
-
-  public GetGroupedAggregatedEventsQuery(GroupedAggregatedTimeBoundQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add("SELECT mean(*), count(*) FROM " + params.getIndex());
-    queryBuilder.add(QueryTemplates.whereTimeWithin(params.getStartDate(), params.getEndDate()));
-    queryBuilder.add("GROUP BY " + params.getGroupingTag() + ",time("
-            + params.getAggregationValue() + params.getAggregationUnit()
-            + ") fill(none) ");
-    queryBuilder.add("ORDER BY time ");
-  }
-
-  @Override
-  protected SpQueryResult postQuery(org.influxdb.dto.QueryResult result) {
-    return convertMultiResult(result);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedEventsQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedEventsQuery.java
deleted file mode 100644
index 6bd64094f..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetGroupedEventsQuery.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.GroupedQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-
-public class GetGroupedEventsQuery extends ParameterizedDataExplorerQuery<GroupedQueryParams, SpQueryResult> {
-
-  public GetGroupedEventsQuery(GroupedQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectWildcardFrom(params.getIndex()));
-    queryBuilder.add(QueryTemplates.whereTimeWithin(params.getStartDate(), params.getEndDate()));
-    queryBuilder.add("GROUP BY " + params.getGroupingTag());
-    queryBuilder.add("ORDER BY time ");
-  }
-
-  @Override
-  protected SpQueryResult postQuery(org.influxdb.dto.QueryResult result) {
-    return convertMultiResult(result);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetHeadersWithTypesQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetHeadersWithTypesQuery.java
deleted file mode 100644
index 776bf3786..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetHeadersWithTypesQuery.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.QueryParams;
-import org.influxdb.dto.QueryResult;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class GetHeadersWithTypesQuery extends ParameterizedDataExplorerQuery<QueryParams, Map<String, String>> {
-
-  public GetHeadersWithTypesQuery(QueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add("SHOW FIELD KEYS FROM " + params.getIndex());
-  }
-
-  @Override
-  protected Map<String, String> postQuery(QueryResult result) {
-    Map<String, String> headerTypes = new HashMap<>();
-    for (List<Object> element : result.getResults().get(0).getSeries().get(0).getValues()) {
-      headerTypes.put(element.get(0).toString(), element.get(1).toString());
-    }
-    return headerTypes;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetMaxPagesQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetMaxPagesQuery.java
deleted file mode 100644
index 332b434a4..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetMaxPagesQuery.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.PagingQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.influxdb.dto.QueryResult;
-
-public class GetMaxPagesQuery extends ParameterizedDataExplorerQuery<PagingQueryParams, Integer> {
-
-  public GetMaxPagesQuery(PagingQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectCountFrom(params.getIndex()));
-  }
-
-  @Override
-  protected Integer postQuery(QueryResult result) {
-    int size = ((Double) result.getResults().get(0).getSeries().get(0).getValues().get(0).get(1)).intValue();
-
-    return (size / params.getItemsPerPage());
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsByTimeUnitQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsByTimeUnitQuery.java
deleted file mode 100644
index 379d14c15..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsByTimeUnitQuery.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.TimeUnitQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.influxdb.dto.QueryResult;
-
-public class GetNumberOfRecordsByTimeUnitQuery extends ParameterizedDataExplorerQuery<TimeUnitQueryParams, Double> {
-
-  public GetNumberOfRecordsByTimeUnitQuery(TimeUnitQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectCountFrom(params.getIndex()));
-    queryBuilder.add("WHERE time > now() -"
-            + params.getTimeValue()
-            + params.getTimeUnit());
-  }
-
-  @Override
-  protected Double postQuery(QueryResult result) {
-    double numOfRecords = 0;
-    if (result.getResults().get(0).getSeries() == null) {
-      return numOfRecords;
-    }
-
-    for (Object item : result.getResults().get(0).getSeries().get(0).getValues().get(0)) {
-      if (item instanceof Double) {
-        numOfRecords = Double.parseDouble(item.toString());
-      }
-    }
-    return numOfRecords;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsQuery.java
deleted file mode 100644
index 0ff5dce63..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetNumberOfRecordsQuery.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.TimeBoundQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.influxdb.dto.QueryResult;
-
-public class GetNumberOfRecordsQuery extends ParameterizedDataExplorerQuery<TimeBoundQueryParams, Double> {
-
-  public GetNumberOfRecordsQuery(TimeBoundQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    queryBuilder.add(QueryTemplates.selectCountFrom(params.getIndex()));
-    queryBuilder.add(QueryTemplates.whereTimeWithin(params.getStartDate(), params.getEndDate()));
-  }
-
-  @Override
-  protected Double postQuery(QueryResult result) {
-    double numOfRecords = 0.0;
-    if (result.getResults().get(0).getSeries() == null) {
-      return numOfRecords;
-    }
-
-    for (Object item : result.getResults().get(0).getSeries().get(0).getValues().get(0)) {
-      if (item instanceof Double && numOfRecords < Double.parseDouble(item.toString())) {
-        numOfRecords = Double.parseDouble(item.toString());
-      }
-    }
-
-    return numOfRecords;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetPagingEventsQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetPagingEventsQuery.java
deleted file mode 100644
index daf186e1d..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/GetPagingEventsQuery.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.query;
-
-import org.apache.streampipes.dataexplorer.param.PagingQueryParams;
-import org.apache.streampipes.dataexplorer.template.QueryTemplates;
-import org.apache.streampipes.model.datalake.PageResult;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.influxdb.dto.QueryResult;
-
-import java.util.HashMap;
-import java.util.concurrent.TimeUnit;
-
-public class GetPagingEventsQuery extends ParameterizedDataExplorerQuery<PagingQueryParams, PageResult> {
-
-  private TimeUnit timeUnit;
-
-  public GetPagingEventsQuery(PagingQueryParams queryParams) {
-    super(queryParams);
-  }
-
-  public GetPagingEventsQuery(PagingQueryParams queryParams, TimeUnit timeUnit) {
-    super(queryParams);
-    this.timeUnit = timeUnit;
-  }
-
-  @Override
-  protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
-    if (this.timeUnit != null) {
-      queryBuilder.withTimeUnit(timeUnit);
-    }
-    if (params.isFilterByDate()) {
-      queryBuilder
-              .add(QueryTemplates.selectWildcardFrom(params.getIndex()))
-              .add(QueryTemplates.whereTimeWithin(params.getStartDate(), params.getEndDate()))
-              .add("ORDER BY time LIMIT "
-                      + params.getItemsPerPage()
-                      + " OFFSET "
-                      + params.getPage() * params.getItemsPerPage());
-    } else {
-      queryBuilder.add(QueryTemplates.selectWildcardFrom(params.getIndex()));
-      queryBuilder.add("ORDER BY time LIMIT "
-              + params.getItemsPerPage()
-              + " OFFSET "
-              + params.getPage() * params.getItemsPerPage());
-    }
-  }
-
-  @Override
-  protected PageResult postQuery(QueryResult result) {
-    SpQueryResult dataResult = convertResult(result);
-    int pageSum = new GetMaxPagesQuery(PagingQueryParams.from(params.getIndex(), params.getItemsPerPage()))
-            .executeQuery();
-
-    return new PageResult(dataResult.getTotal(), dataResult.getHeaders(), dataResult.getAllDataSeries().get(0).getRows(), params.getPage(), pageSum, new HashMap<>());
-  }
-}


[streampipes] 05/06: [STREAMPIPES-566] Add basic tests

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit 50af75b11cc010bb2260372fd1993339db01f9a7
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Mon Nov 21 20:50:44 2022 +0100

    [STREAMPIPES-566] Add basic tests
---
 streampipes-data-explorer/pom.xml                  |  6 +++
 .../writer/TestConfiguredCsvOutputWriter.java      | 52 +++++++++++++++++++++
 .../writer/TestConfiguredJsonOutputWriter.java     | 53 ++++++++++++++++++++++
 .../query/writer/TestConfiguredOutputWriter.java   | 42 +++++++++++++++++
 .../v4/query/writer/item/TestCsvItemWriter.java    | 48 ++++++++++++++++++++
 .../v4/query/writer/item/TestItemWriter.java       | 36 +++++++++++++++
 .../v4/query/writer/item/TestJsonItemWriter.java   | 39 ++++++++++++++++
 7 files changed, 276 insertions(+)

diff --git a/streampipes-data-explorer/pom.xml b/streampipes-data-explorer/pom.xml
index 78d167625..2e45a6021 100644
--- a/streampipes-data-explorer/pom.xml
+++ b/streampipes-data-explorer/pom.xml
@@ -44,6 +44,12 @@
             <version>0.71.0-SNAPSHOT</version>
         </dependency>
 
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.influxdb</groupId>
             <artifactId>influxdb-java</artifactId>
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
new file mode 100644
index 000000000..47b2ac1ca
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipesdataexplorer.v4.query.writer;
+
+import com.google.common.base.Charsets;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredCsvOutputWriter;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestConfiguredCsvOutputWriter extends TestConfiguredOutputWriter {
+
+  private static final String Expected = "time,string,number\n1668578077051,test,1\n1668578127050,test2,2\n";
+
+  @Test
+  public void testCsvOutputWriter() throws IOException {
+    var writer = new ConfiguredCsvOutputWriter();
+    writer.configure(new ProvidedQueryParams(null, new HashMap<>()), true);
+
+    try (var outputStream = new ByteArrayOutputStream()) {
+      writer.beforeFirstItem(outputStream);
+
+      for (int i = 0; i < rows.size(); i++) {
+        writer.writeItem(outputStream, rows.get(i), columns, i == 0);
+      }
+
+      writer.afterLastItem(outputStream);
+      assertEquals(Expected, outputStream.toString(Charsets.UTF_8));
+    }
+  }
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
new file mode 100644
index 000000000..88d7967c5
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipesdataexplorer.v4.query.writer;
+
+import com.google.common.base.Charsets;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredJsonOutputWriter;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestConfiguredJsonOutputWriter extends TestConfiguredOutputWriter {
+
+  private static final String Expected = "[{\"time\": 1668578077051,\"string\": \"test\",\"number\": 1}"
+      +",{\"time\": 1668578127050,\"string\": \"test2\",\"number\": 2}]";
+
+  @Test
+  public void testJsonOutputWriter() throws IOException {
+    var writer = new ConfiguredJsonOutputWriter();
+    writer.configure(new ProvidedQueryParams(null, new HashMap<>()), true);
+
+    try (var outputStream = new ByteArrayOutputStream()) {
+      writer.beforeFirstItem(outputStream);
+
+      for (int i = 0; i < rows.size(); i++) {
+        writer.writeItem(outputStream, rows.get(i), columns, i == 0);
+      }
+
+      writer.afterLastItem(outputStream);
+      assertEquals(Expected, outputStream.toString(Charsets.UTF_8));
+    }
+  }
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java
new file mode 100644
index 000000000..78927a1c9
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipesdataexplorer.v4.query.writer;
+
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class TestConfiguredOutputWriter {
+
+  protected List<List<Object>> rows;
+  protected List<String> columns;
+
+  @Before
+  public void before() {
+    this.rows = Arrays.asList(
+        Arrays.asList("2022-11-16T05:54:37.051Z", "test", 1),
+        Arrays.asList("2022-11-16T05:55:27.05Z", "test2", 2)
+    );
+
+    this.columns = Arrays.asList("time", "string", "number");
+  }
+
+
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
new file mode 100644
index 000000000..c1e8d2312
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipesdataexplorer.v4.query.writer.item;
+
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCsvItemWriter extends TestItemWriter {
+
+  private static final String ExpectedComma = "1668578077051,test,1";
+  private static final String ExpectedSemicolon = "1668578077051;test;1";
+
+  @Test
+  public void testCsvItemWriterCommaSeparated() {
+    var writer = new CsvItemWriter(",");
+
+    String result = writer.createItem(row, columns);
+
+    assertEquals(ExpectedComma, result);
+  }
+
+  @Test
+  public void testCsvItemWriterSemicolonSeparated() {
+    var writer = new CsvItemWriter(";");
+
+    String result = writer.createItem(row, columns);
+
+    assertEquals(ExpectedSemicolon, result);
+  }
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java
new file mode 100644
index 000000000..26092fd88
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipesdataexplorer.v4.query.writer.item;
+
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class TestItemWriter {
+
+  protected List<Object> row;
+  protected List<String> columns;
+
+  @Before
+  public void before() {
+    this.row = Arrays.asList("2022-11-16T05:54:37.051Z", "test", 1);
+    this.columns = Arrays.asList("time", "string", "number");
+  }
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
new file mode 100644
index 000000000..f89ecd167
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipesdataexplorer.v4.query.writer.item;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestJsonItemWriter extends TestItemWriter {
+
+  private static final String Expected = "{\"time\": 1668578077051,\"string\": \"test\",\"number\": 1}";
+  
+  @Test
+  public void testJsonWriter() {
+    var writer = new JsonItemWriter(new Gson());
+
+    String result = writer.createItem(row, columns);
+
+    assertEquals(Expected, result);
+  }
+}


[streampipes] 06/06: [STREAMPIPES-566] Use time field index over fixed assignment

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit c6ad8b85a095b17a1ee71be58fffe6be8aa02a3f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Nov 23 08:05:29 2022 +0100

    [STREAMPIPES-566] Use time field index over fixed assignment
---
 .../dataexplorer/v4/query/StreamedQueryResultProvider.java       | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
index ada5ee6f5..5e48ec304 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
@@ -37,6 +37,7 @@ import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParam
 public class StreamedQueryResultProvider extends QueryResultProvider {
 
   private static final int MAX_RESULTS_PER_QUERY = 500000;
+  private static final String TIME_FIELD = "time";
 
   private final OutputFormat format;
 
@@ -107,7 +108,11 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
    * @param measurement contains the actual timestamp name value
    * @param dataResult the query result of the database with 'time' as timestamp field name
    */
-  private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
-    dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+  private void changeTimestampHeader(DataLakeMeasure measurement,
+                                     SpQueryResult dataResult) {
+    var timeFieldIndex = dataResult.getHeaders().indexOf(TIME_FIELD);
+    if (timeFieldIndex > -1) {
+      dataResult.getHeaders().set(timeFieldIndex, measurement.getTimestampFieldName());
+    }
   }
 }


[streampipes] 01/06: [STREAMPIPES-566] Fix query download, improve code structure

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit e3c1c660c945c6498c6c763e2fdc4a88ab0bf08f
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Nov 20 16:18:49 2022 +0100

    [STREAMPIPES-566] Fix query download, improve code structure
---
 .../dataexplorer/DataLakeManagementV4.java         | 206 +--------------------
 .../dataexplorer/v4/query/QueryResultProvider.java |  64 +++++++
 .../v4/query/StreamedQueryResultProvider.java      | 113 +++++++++++
 .../v4/query/writer/ConfiguredCsvOutputWriter.java |  76 ++++++++
 .../query/writer/ConfiguredJsonOutputWriter.java   |  70 +++++++
 .../v4/query/writer/ConfiguredOutputWriter.java    |  53 ++++++
 .../dataexplorer/v4/query/writer/OutputFormat.java |  36 ++++
 .../v4/query/writer/item/CsvItemWriter.java        |  37 ++++
 .../v4/query/writer/item/ItemGenerator.java        |  57 ++++++
 .../v4/query/writer/item/JsonItemWriter.java       |  50 +++++
 .../dataexplorer/v4/utils/TimeParser.java          |  30 +++
 .../apache/streampipes/ps/DataLakeResourceV4.java  |   3 +-
 .../services/data-export.service.ts                |   2 +-
 13 files changed, 598 insertions(+), 199 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 9ee04ff03..c6bbc5278 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,7 +18,6 @@
 
 package org.apache.streampipes.dataexplorer;
 
-import com.google.gson.Gson;
 import com.google.gson.JsonObject;
 import org.apache.streampipes.config.backend.BackendConfig;
 import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
@@ -26,11 +25,12 @@ import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.query.ShowRetentionPolicyQuery;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
 import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
 import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
 import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.dataexplorer.v4.query.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.v4.query.StreamedQueryResultProvider;
 import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -50,39 +50,11 @@ import org.lightcouch.CouchDbClient;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.time.Instant;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.ZonedDateTime;
-import java.time.format.DateTimeFormatter;
-import java.time.format.DateTimeFormatterBuilder;
-import java.time.temporal.ChronoField;
-import java.time.temporal.ChronoUnit;
-import java.time.temporal.TemporalAccessor;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
+import java.util.*;
 import java.util.stream.Collectors;
 
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-
 public class DataLakeManagementV4 {
 
-    public static final String FOR_ID_KEY = "forId";
-
-    private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
-            .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
-            .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
-            .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
-            .toFormatter();
-
     public List<DataLakeMeasure> getAllMeasurements() {
         return DataExplorerUtils.getInfos();
     }
@@ -91,163 +63,17 @@ public class DataLakeManagementV4 {
         return getDataLakeStorage().findOne(measureId);
     }
 
-    private Optional<DataLakeMeasure> findByMeasurementName(String measurementName) {
-        return getAllMeasurements()
-            .stream()
-            .filter(measurement -> measurement.getMeasureName().equals(measurementName))
-            .findFirst();
-    }
-
-    public SpQueryResult getData(ProvidedQueryParams queryParams, boolean ignoreMissingData) throws IllegalArgumentException {
-        if (queryParams.has(QP_AUTO_AGGREGATE)) {
-            queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
-        }
-        Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
-
-        if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
-            int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
-            return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
-        }
-
-        if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
-            String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
-            return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
-        } else {
-            return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
-        }
+    public SpQueryResult getData(ProvidedQueryParams queryParams,
+                                 boolean ignoreMissingData) throws IllegalArgumentException {
+        return new QueryResultProvider(queryParams, ignoreMissingData).getData();
     }
 
     public void getDataAsStream(ProvidedQueryParams params,
-                                String format,
+                                OutputFormat format,
                                 boolean ignoreMissingValues,
                                 OutputStream outputStream) throws IOException {
-        if (!params.has(QP_LIMIT)) {
-            params.update(QP_LIMIT, 500000);
-        }
-
-        var measurement = findByMeasurementName(params.getMeasurementId()).get();
-
-        SpQueryResult dataResult;
-        //JSON
-        if (format.equals("json")) {
-
-            Gson gson = new Gson();
-            int i = 0;
-            if (params.has(QP_PAGE)) {
-                i = params.getAsInt(QP_PAGE);
-            }
-
-            boolean isFirstDataObject = true;
-
-            outputStream.write(toBytes("["));
-            do {
-                params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
-                dataResult = getData(params, ignoreMissingValues);
-
-
-                if (dataResult.getTotal() > 0) {
-                    changeTimestampHeader(measurement, dataResult);
-
-                    for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
-                        if (!isFirstDataObject) {
-                            outputStream.write(toBytes(","));
-                        }
-
-                        //produce one json object
-                        boolean isFirstElementInRow = true;
-                        outputStream.write(toBytes("{"));
-                        for (int i1 = 0; i1 < row.size(); i1++) {
-                            Object element = row.get(i1);
-                            if (!isFirstElementInRow) {
-                                outputStream.write(toBytes(","));
-                            }
-                            isFirstElementInRow = false;
-                            if (i1 == 0) {
-                                element = parseTime(element.toString());
-                            }
-                            //produce json e.g. "name": "Pipes" or "load": 42
-                            outputStream.write(toBytes("\"" + dataResult.getHeaders().get(i1) + "\": "
-                                    + gson.toJson(element)));
-                        }
-                        outputStream.write(toBytes("}"));
-                        isFirstDataObject = false;
-                    }
 
-                    i++;
-                }
-            } while (dataResult.getTotal() > 0);
-            outputStream.write(toBytes("]"));
-
-            //CSV
-        } else if (format.equals("csv")) {
-            int i = 0;
-            if (params.has(QP_PAGE)) {
-                i = params.getAsInt(QP_PAGE);
-            }
-
-            boolean isFirstDataObject = true;
-            String delimiter = ",";
-
-            if (params.has(QP_CSV_DELIMITER)) {
-                delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? "," : ";";
-            }
-
-            do {
-                params.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(i));
-                dataResult = getData(params, ignoreMissingValues);
-
-
-                //Send first header
-                if (dataResult.getTotal() > 0) {
-                    changeTimestampHeader(measurement, dataResult);
-
-                    if (isFirstDataObject) {
-                        boolean isFirst = true;
-                        for (int i1 = 0; i1 < dataResult.getHeaders().size(); i1++) {
-                            if (!isFirst) {
-                                outputStream.write(toBytes(delimiter));
-                            }
-                            isFirst = false;
-                            outputStream.write(toBytes(dataResult.getHeaders().get(i1)));
-                        }
-                    }
-                    outputStream.write(toBytes("\n"));
-                    isFirstDataObject = false;
-                }
-
-                if (dataResult.getTotal() > 0) {
-                    for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
-                        boolean isFirstInRow = true;
-                        for (int i1 = 0; i1 < row.size(); i1++) {
-                            Object element = row.get(i1);
-                            if (!isFirstInRow) {
-                                outputStream.write(toBytes(delimiter));
-                            }
-                            isFirstInRow = false;
-                            if (i1 == 0) {
-                                element = parseTime(element.toString());
-                            }
-                            if (element == null) {
-                                outputStream.write(toBytes(""));
-                            } else {
-                                outputStream.write(toBytes(element.toString()));
-                            }
-                        }
-                        outputStream.write(toBytes("\n"));
-                    }
-                }
-                i++;
-            } while (dataResult.getTotal() > 0);
-        }
-    }
-
-    /**
-     * Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
-     * @param measurement contains the actual timestamp name value
-     * @param dataResult the query result of the database with 'time' as timestamp field name
-     */
-    private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
-        dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+        new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
     }
 
     public boolean removeAllMeasurements() {
@@ -345,20 +171,6 @@ public class DataLakeManagementV4 {
         return isSuccess;
     }
 
-    private byte[] toBytes(String value) {
-        return value.getBytes();
-    }
-
-    private static Long parseTime(String v) {
-        TemporalAccessor temporalAccessor = formatter.parseBest(v,
-                ZonedDateTime::from,
-                LocalDateTime::from,
-                LocalDate::from);
-
-        Instant instant = Instant.from(temporalAccessor);
-        return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
-    }
-
     public Map<String, Object> getTagValues(String measurementId,
                                             String fields) {
         InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
new file mode 100644
index 000000000..34556770f
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
+import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.util.Map;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+
+public class QueryResultProvider {
+
+  public static final String FOR_ID_KEY = "forId";
+
+  protected ProvidedQueryParams queryParams;
+  protected final boolean ignoreMissingData;
+
+  public QueryResultProvider(ProvidedQueryParams queryParams,
+                             boolean ignoreMissingData) {
+    this.queryParams = queryParams;
+    this.ignoreMissingData = ignoreMissingData;
+  }
+
+  public SpQueryResult getData() {
+    if (queryParams.has(QP_AUTO_AGGREGATE)) {
+      queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
+    }
+    Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
+
+    if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
+      int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
+      return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
+    }
+
+    if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
+      String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
+      return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
+    } else {
+      return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
+    }
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
new file mode 100644
index 000000000..ada5ee6f5
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query;
+
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
+import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredOutputWriter;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
+
+public class StreamedQueryResultProvider extends QueryResultProvider {
+
+  private static final int MAX_RESULTS_PER_QUERY = 500000;
+
+  private final OutputFormat format;
+
+  public StreamedQueryResultProvider(ProvidedQueryParams params,
+                                     OutputFormat format,
+                                     boolean ignoreMissingValues) {
+    super(params, ignoreMissingValues);
+    this.format = format;
+  }
+
+  public void getDataAsStream(OutputStream outputStream) throws IOException {
+    var usesLimit = queryParams.has(QP_LIMIT);
+    var configuredWriter = ConfiguredOutputWriter
+        .getConfiguredWriter(format, queryParams, ignoreMissingData);
+
+    if (!queryParams.has(QP_LIMIT)) {
+      queryParams.update(QP_LIMIT, MAX_RESULTS_PER_QUERY);
+    }
+
+    var limit = queryParams.getAsInt(QP_LIMIT);
+    var measurement = findByMeasurementName(queryParams.getMeasurementId()).get();
+
+    SpQueryResult dataResult;
+    int page = 0;
+    if (queryParams.has(QP_PAGE)) {
+      page = queryParams.getAsInt(QP_PAGE);
+    }
+
+    boolean isFirstDataItem = true;
+    configuredWriter.beforeFirstItem(outputStream);
+    do {
+      queryParams.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(page));
+      dataResult = getData();
+
+      if (dataResult.getTotal() > 0) {
+        changeTimestampHeader(measurement, dataResult);
+        var columns = dataResult.getHeaders();
+        for (List<Object> row : dataResult.getAllDataSeries().get(0).getRows()) {
+          configuredWriter.writeItem(outputStream, row, columns, isFirstDataItem);
+          isFirstDataItem = false;
+        }
+      }
+      page++;
+    } while (queryNextPage(dataResult.getTotal(), usesLimit, limit, page));
+    configuredWriter.afterLastItem(outputStream);
+  }
+
+  private boolean queryNextPage(int lastResultsCount,
+                                boolean usesLimit,
+                                int limit,
+                                int lastPage) {
+    if (usesLimit) {
+      return !(limit <= (lastPage) * MAX_RESULTS_PER_QUERY);
+    } else {
+      return lastResultsCount > 0;
+    }
+  }
+
+  private Optional<DataLakeMeasure> findByMeasurementName(String measurementName) {
+    return DataExplorerUtils.getInfos()
+        .stream()
+        .filter(measurement -> measurement.getMeasureName().equals(measurementName))
+        .findFirst();
+  }
+
+  /**
+   * Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
+   * @param measurement contains the actual timestamp name value
+   * @param dataResult the query result of the database with 'time' as timestamp field name
+   */
+  private void changeTimestampHeader(DataLakeMeasure measurement, SpQueryResult dataResult) {
+    dataResult.getHeaders().set(0, measurement.getTimestampFieldName());
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
new file mode 100644
index 000000000..8d92feb61
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.StringJoiner;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
+
+public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
+
+  private static final String LINE_SEPARATOR = "\n";
+  private static final String COMMA = ",";
+  private static final String SEMICOLON = ";";
+
+  private CsvItemWriter itemWriter;
+  private String delimiter = COMMA;
+
+  @Override
+  public void configure(ProvidedQueryParams params,
+                        boolean ignoreMissingValues) {
+    if (params.has(QP_CSV_DELIMITER)) {
+      delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? COMMA : SEMICOLON;
+    }
+    this.itemWriter = new CsvItemWriter(delimiter);
+  }
+
+  @Override
+  public void beforeFirstItem(OutputStream outputStream) {
+    // do nothing
+  }
+
+  @Override
+  public void afterLastItem(OutputStream outputStream) {
+    // do nothing
+  }
+
+  @Override
+  public void writeItem(OutputStream outputStream,
+                        List<Object> row,
+                        List<String> columnNames,
+                        boolean firstObject) throws IOException {
+    if (firstObject) {
+      outputStream.write(toBytes(makeHeaderLine(columnNames)));
+    }
+
+    outputStream.write(toBytes(itemWriter.createItem(row, columnNames) + LINE_SEPARATOR));
+  }
+
+  private String makeHeaderLine(List<String> columns) {
+    StringJoiner joiner = new StringJoiner(this.delimiter);
+    columns.forEach(joiner::add);
+    return joiner + LINE_SEPARATOR;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
new file mode 100644
index 000000000..7df602960
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import com.google.gson.Gson;
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.ItemGenerator;
+import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+public class ConfiguredJsonOutputWriter extends ConfiguredOutputWriter {
+
+  private static final String BEGIN_ARRAY = "[";
+  private static final String END_ARRAY = "]";
+
+  private final ItemGenerator jsonObjectWriter;
+
+  public ConfiguredJsonOutputWriter() {
+    Gson gson = new Gson();
+    this.jsonObjectWriter = new JsonItemWriter(gson);
+  }
+
+  @Override
+  public void configure(ProvidedQueryParams params,
+                        boolean ignoreMissingValues) {
+    // do nothing
+  }
+
+  @Override
+  public void beforeFirstItem(OutputStream outputStream) throws IOException {
+    outputStream.write(toBytes(BEGIN_ARRAY));
+  }
+
+  @Override
+  public void afterLastItem(OutputStream outputStream) throws IOException {
+    outputStream.write(toBytes(END_ARRAY));
+  }
+
+  @Override
+  public void writeItem(OutputStream outputStream,
+                        List<Object> row,
+                        List<String> columnNames,
+                        boolean firstObject) throws IOException {
+    if (!firstObject) {
+      outputStream.write(toBytes(","));
+    }
+
+    var item = jsonObjectWriter.createItem(row, columnNames);
+    outputStream.write(toBytes(item));
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
new file mode 100644
index 000000000..0dfaa163a
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+
+public abstract class ConfiguredOutputWriter {
+
+  public static ConfiguredOutputWriter getConfiguredWriter(OutputFormat format,
+                                                 ProvidedQueryParams params,
+                                                 boolean ignoreMissingValues) {
+    var writer = format.getWriter();
+    writer.configure(params, ignoreMissingValues);
+
+    return writer;
+  }
+
+  public abstract void configure(ProvidedQueryParams params,
+                                 boolean ignoreMissingValues);
+
+  public abstract void beforeFirstItem(OutputStream outputStream) throws IOException;
+
+  public abstract void afterLastItem(OutputStream outputStream) throws IOException;
+
+  public abstract void writeItem(OutputStream outputStream,
+                                 List<Object> row,
+                                 List<String> columnNames,
+                                 boolean firstObject) throws IOException;
+
+  protected byte[] toBytes(String value) {
+    return value.getBytes();
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
new file mode 100644
index 000000000..5a522b403
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.v4.query.writer;
+
+import java.util.function.Supplier;
+
+public enum OutputFormat {
+  JSON(ConfiguredJsonOutputWriter::new),
+  CSV(ConfiguredCsvOutputWriter::new);
+
+  private final Supplier<ConfiguredOutputWriter> writerSupplier;
+
+  OutputFormat(Supplier<ConfiguredOutputWriter> writerSupplier) {
+    this.writerSupplier = writerSupplier;
+  }
+
+  public ConfiguredOutputWriter getWriter() {
+    return writerSupplier.get();
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
new file mode 100644
index 000000000..5fe06fb52
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+public class CsvItemWriter extends ItemGenerator {
+
+  public CsvItemWriter(String delimiter) {
+    super(delimiter);
+  }
+
+  @Override
+  protected String makeItemString(String key, Object value) {
+    return value != null ? value.toString() : "";
+  }
+
+  @Override
+  protected String finalizeItem(String item) {
+    return item;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
new file mode 100644
index 000000000..2e5442f53
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+import org.apache.streampipes.dataexplorer.v4.utils.TimeParser;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+public abstract class ItemGenerator {
+
+  protected static final String COMMA_SEPARATOR = ",";
+
+  private final String separator;
+
+  public ItemGenerator(String separator) {
+    this.separator = separator;
+  }
+
+  public String createItem(List<Object> row,
+                           List<String> columns) {
+    StringJoiner joiner = new StringJoiner(separator);
+
+    for (int i = 0; i < row.size(); i++) {
+      var value = row.get(i);
+      if (i == 0) {
+        value = TimeParser.parseTime(value.toString());
+      }
+      joiner.add(makeItemString(columns.get(i), value));
+    }
+
+    return finalizeItem(joiner.toString());
+  }
+
+  protected abstract String makeItemString(String key,
+                                  Object value);
+
+  protected abstract String finalizeItem(String item);
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
new file mode 100644
index 000000000..c4b8d4525
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+
+import com.google.gson.Gson;
+
+public class JsonItemWriter extends ItemGenerator {
+
+  private static final String BEGIN_OBJECT = "{";
+  private static final String END_OBJECT = "}";
+
+  private final Gson gson;
+
+  public JsonItemWriter(Gson gson) {
+    super(COMMA_SEPARATOR);
+    this.gson = gson;
+  }
+
+  @Override
+  protected String makeItemString(String key,
+                                  Object value) {
+    var stringValue = value != null ? gson.toJson(value) : null;
+    return "\""
+        + key
+        + "\": "
+        + stringValue;
+  }
+
+  @Override
+  protected String finalizeItem(String item) {
+    return BEGIN_OBJECT + item + END_OBJECT;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
new file mode 100644
index 000000000..c7c47267c
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
@@ -0,0 +1,30 @@
+package org.apache.streampipes.dataexplorer.v4.utils;
+
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.temporal.ChronoField;
+import java.time.temporal.ChronoUnit;
+import java.time.temporal.TemporalAccessor;
+
+public class TimeParser {
+
+  private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
+      .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
+      .parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
+      .parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
+      .toFormatter();
+
+  public static Long parseTime(String v) {
+    TemporalAccessor temporalAccessor = formatter.parseBest(v,
+        ZonedDateTime::from,
+        LocalDateTime::from,
+        LocalDate::from);
+
+    Instant instant = Instant.from(temporalAccessor);
+    return Instant.EPOCH.until(instant, ChronoUnit.MILLIS);
+  }
+}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 838cd1aa2..bce7ebed7 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -27,6 +27,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
 import io.swagger.v3.oas.annotations.responses.ApiResponse;
 import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
 import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
 import org.apache.streampipes.model.StreamPipesErrorMessage;
 import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -217,7 +218,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
                 format = "csv";
             }
 
-            String outputFormat = format;
+            OutputFormat outputFormat = format.equals("csv") ? OutputFormat.CSV : OutputFormat.JSON;
             StreamingOutput streamingOutput = output -> dataLakeManagement.getDataAsStream(
                 sanitizedParams,
                 outputFormat,
diff --git a/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts b/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
index beda107be..185606f58 100644
--- a/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
+++ b/ui/src/app/core-ui/data-download-dialog/services/data-export.service.ts
@@ -95,7 +95,7 @@ export class DataExportService {
     return this.dataViewQueryGeneratorService
       .generateQuery(
         exportConfig.dataExportConfig.dateRange.startDate.getTime(),
-        exportConfig.dataExportConfig.dateRange.startDate.getTime(),
+        exportConfig.dataExportConfig.dateRange.endDate.getTime(),
         dataDownloadDialogModel.dataExplorerDataConfig.sourceConfigs[selectedQueryIndex],
         false
       );


[streampipes] 03/06: [STREAMPIPES-566] Add license header

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git

commit e7123dbdc324671460b079569170479efcbebbc1
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Nov 20 16:37:13 2022 +0100

    [STREAMPIPES-566] Add license header
---
 .../streampipes/dataexplorer/v4/utils/TimeParser.java  | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
index c7c47267c..71aab02d6 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.dataexplorer.v4.utils;
 
 import java.time.Instant;