You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/04/02 18:55:52 UTC

[streampipes] branch dev updated: Improve data explorer query management (#1406) (#1407)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new c18643663 Improve data explorer query management (#1406) (#1407)
c18643663 is described below

commit c186436633793d870ba6ff9bdee24f8aebbebeec
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Apr 2 20:55:43 2023 +0200

    Improve data explorer query management (#1406) (#1407)
    
    * Improve data explorer query management (#1406)
    
    * Fix checkstyle issue (#1406)
    
    * Fix checkstyle issue (#1406)
    
    * Refactor data explorer module (#1406)
    
    * Fix checkstyle issues (#1406)
    
    * Fix checkstyle issue in text (#1406)
    
    * Fix test (#1406)
    
    * [hotfix] Add test for DataLakeQueryBuilder (#1459)
    
    * Merge missing test (#1406)
    
    * Cleanup code (#1406)
    
    ---------
    
    Co-authored-by: Philipp Zehnder <te...@users.noreply.github.com>
---
 .../commons/configs/CouchDbConfigurations.java     |  35 ---
 .../commons/configs/CouchDbEnvKeys.java            |  25 ---
 .../configs/DataExplorerConfigurations.java        |  46 ----
 .../commons/configs/DataExplorerEnvKeys.java       |  28 ---
 .../dataexplorer/DataExplorerQueryManagement.java  | 142 ++++++++++++
 ...ntV3.java => DataExplorerSchemaManagement.java} |  93 ++++++--
 .../dataexplorer/DataLakeManagementV4.java         | 245 ---------------------
 .../api/IDataExplorerQueryManagement.java          |  47 ++++
 .../IDataExplorerSchemaManagement.java}            |  25 ++-
 .../IQueryStatement.java}                          |  14 +-
 .../influx/DataExplorerInfluxQueryExecutor.java    | 148 +++++++++++++
 .../DataLakeInfluxQueryBuilder.java}               | 138 +++++++-----
 .../DeleteQueryParams.java}                        |  45 ++--
 .../param/ProvidedRestQueryParamConverter.java     | 147 +++++++++++++
 .../ProvidedRestQueryParams.java}                  |  12 +-
 .../dataexplorer/param/SelectQueryParams.java      | 132 +++++++++++
 .../SupportedRestQueryParams.java}                 |   4 +-
 .../model/AggregationFunction.java}                |   8 +-
 .../model/FillClauseParams.java}                   |  20 +-
 .../model/GroupByTagsClauseParams.java}            |  24 +-
 .../model/GroupByTimeClauseParams.java}            |  19 +-
 .../model/ItemClauseParams.java}                   |  19 +-
 .../model/OffsetClauseParams.java}                 |  19 +-
 .../model/OrderByClauseParams.java}                |  20 +-
 .../model/SelectClauseParams.java}                 |  73 +++---
 .../{v4/params => param/model}/SelectColumn.java   |  58 +++--
 .../param/model/WhereClauseParams.java             | 122 ++++++++++
 .../{v4 => query}/AutoAggregationHandler.java      |  53 +++--
 .../query/DataExplorerQueryExecutor.java           | 105 +++++++++
 .../{v4 => }/query/QueryResultProvider.java        |  28 ++-
 .../query/StreamedQueryResultProvider.java         |  18 +-
 .../query/writer/ConfiguredCsvOutputWriter.java    |  10 +-
 .../query/writer/ConfiguredJsonOutputWriter.java   |  10 +-
 .../query/writer/ConfiguredOutputWriter.java       |   8 +-
 .../{v4 => }/query/writer/OutputFormat.java        |   2 +-
 .../{v4 => }/query/writer/item/CsvItemWriter.java  |   2 +-
 .../{v4 => }/query/writer/item/ItemGenerator.java  |   4 +-
 .../{v4 => }/query/writer/item/JsonItemWriter.java |   2 +-
 .../DataLakeQueryOrdering.java                     |   2 +-
 .../FilterCondition.java}                          |  24 +-
 .../querybuilder/IDataLakeQueryBuilder.java        |  84 +++++++
 .../dataexplorer/sdk/DataLakeQueryConstants.java   |  30 ---
 .../dataexplorer/sdk/IDataLakeQueryBuilder.java    |  74 -------
 .../dataexplorer/{v4 => }/utils/TimeParser.java    |   2 +-
 .../v4/params/DeleteFromStatementParams.java       |  31 ---
 .../dataexplorer/v4/params/FillParams.java         |  35 ---
 .../v4/params/WhereStatementParams.java            | 122 ----------
 .../dataexplorer/v4/query/DataExplorerQueryV4.java | 234 --------------------
 .../dataexplorer/v4/query/QueryBuilder.java        |  64 ------
 .../v4/query/elements/DeleteFromStatement.java     |  33 ---
 .../v4/query/elements/GroupingByTags.java          |  43 ----
 .../v4/query/elements/GroupingByTime.java          |  34 ---
 .../v4/query/elements/ItemLimitation.java          |  34 ---
 .../dataexplorer/v4/query/elements/Offset.java     |  34 ---
 .../v4/query/elements/OrderingByTime.java          |  34 ---
 .../v4/query/elements/SelectFromStatement.java     |  51 -----
 .../v4/query/elements/TimeBoundary.java            |  40 ----
 .../v4/query/elements/WhereStatement.java          |  39 ----
 .../dataexplorer/v4/template/QueryTemplatesV4.java |  63 ------
 .../v4/utils/DataLakeManagementUtils.java          | 163 --------------
 .../dataexplorer/param/SelectQueryParamsTest.java  | 208 +++++++++++++++++
 .../param}/WhereStatementParamsTest.java           |  27 ++-
 .../writer/TestConfiguredCsvOutputWriter.java      |   7 +-
 .../writer/TestConfiguredJsonOutputWriter.java     |   7 +-
 .../query/writer/TestConfiguredOutputWriter.java   |   2 +-
 .../query/writer/item/TestCsvItemWriter.java       |   4 +-
 .../query/writer/item/TestItemWriter.java          |   2 +-
 .../query/writer/item/TestJsonItemWriter.java      |   4 +-
 .../dataexplorer/sdk/DataLakeQueryBuilderTest.java |   6 +-
 .../utils/ProvidedQueryParameterBuilder.java       |  92 ++++++++
 .../sinks/internal/jvm/SinksInternalJvmInit.java   |   4 -
 .../streampipes/ps/DataLakeMeasureResourceV3.java  |  57 -----
 .../streampipes/ps/DataLakeMeasureResourceV4.java  |  23 +-
 .../apache/streampipes/ps/DataLakeResourceV4.java  |  70 +++---
 .../apache/streampipes/rest/ResetManagement.java   |  14 +-
 .../service/core/StreamPipesResourceConfig.java    |   2 -
 76 files changed, 1758 insertions(+), 1991 deletions(-)

diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
deleted file mode 100644
index 45678a9c1..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class CouchDbConfigurations {
-
-  public static List<ConfigItem> getDefaults() {
-    return Arrays.asList(
-        ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
-        ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
-        ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
-    );
-  }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
deleted file mode 100644
index 23867c634..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-public class CouchDbEnvKeys {
-  public static final String COUCHDB_HOST = "SP_COUCHDB_HOST";
-  public static final String COUCHDB_PORT = "SP_COUCHDB_PORT";
-  public static final String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
-}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
deleted file mode 100644
index feb7135ea..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class DataExplorerConfigurations {
-  public static final String DATA_LAKE_DATABASE_NAME = "sp";
-
-  public static List<ConfigItem> getDefaults() {
-
-    return Arrays.asList(
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb",
-            "Hostname for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http",
-            "Protocol for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default",
-            "Username for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default",
-            "Password for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME,
-            "Database name for the StreamPipes data lake database")
-    );
-  }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
deleted file mode 100644
index 6eb840708..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-public class DataExplorerEnvKeys {
-  public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-  public static final String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-  public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-  public static final String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-  public static final String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-  public static final String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
new file mode 100644
index 000000000..3e927e977
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
+import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
+import org.apache.streampipes.dataexplorer.query.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.query.StreamedQueryResultProvider;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {
+
+  private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
+
+  public DataExplorerQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement) {
+    this.dataExplorerSchemaManagement = dataExplorerSchemaManagement;
+  }
+
+  @Override
+  public SpQueryResult getData(ProvidedRestQueryParams queryParams,
+                               boolean ignoreMissingData) throws IllegalArgumentException {
+    return new QueryResultProvider(queryParams, ignoreMissingData).getData();
+  }
+
+  @Override
+  public void getDataAsStream(ProvidedRestQueryParams params,
+                              OutputFormat format,
+                              boolean ignoreMissingValues,
+                              OutputStream outputStream) throws IOException {
+
+    new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
+  }
+
+  @Override
+  public boolean deleteAllData() {
+    List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+
+    for (DataLakeMeasure measure : allMeasurements) {
+      QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
+      if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean deleteData(String measurementID) {
+    List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+    for (DataLakeMeasure measure : allMeasurements) {
+      if (measure.getMeasureName().equals(measurementID)) {
+        QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
+
+        return !queryResult.hasError();
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
+    DeleteQueryParams params =
+        ProvidedRestQueryParamConverter.getDeleteQueryParams(measurementID, startDate, endDate);
+    return new DataExplorerInfluxQueryExecutor().executeQuery(params);
+  }
+
+  @Override
+  public Map<String, Object> getTagValues(String measurementId,
+                                          String fields) {
+    InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
+    String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
+    Map<String, Object> tags = new HashMap<>();
+    if (fields != null && !("".equals(fields))) {
+      List<String> fieldList = Arrays.asList(fields.split(","));
+      fieldList.forEach(f -> {
+        String q =
+            "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
+                + "\" WITH KEY = \"" + f + "\"";
+        Query query = new Query(q);
+        QueryResult queryResult = influxDB.query(query);
+        queryResult.getResults().forEach(res -> {
+          res.getSeries().forEach(series -> {
+            if (series.getValues().size() > 0) {
+              String field = series.getValues().get(0).get(0).toString();
+              List<String> values =
+                  series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+              tags.put(field, values);
+            }
+          });
+        });
+      });
+    }
+
+    return tags;
+  }
+
+  private List<DataLakeMeasure> getAllMeasurements() {
+    return this.dataExplorerSchemaManagement.getAllMeasurements();
+  }
+
+  private Environment getEnvironment() {
+    return Environments.getEnvironment();
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
similarity index 54%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
index 6fd104ecd..f28cf6cbd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
@@ -18,41 +18,104 @@
 
 package org.apache.streampipes.dataexplorer;
 
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyList;
 import org.apache.streampipes.model.schema.EventPropertyNested;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.storage.api.IDataLakeStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
+import com.google.gson.JsonObject;
+import org.lightcouch.CouchDbClient;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
+public class DataExplorerSchemaManagement implements IDataExplorerSchemaManagement {
 
-@Deprecated
-public class DataLakeNoUserManagementV3 {
+  @Override
+  public List<DataLakeMeasure> getAllMeasurements() {
+    return DataExplorerUtils.getInfos();
+  }
 
+  @Override
+  public DataLakeMeasure getById(String elementId) {
+    return getDataLakeStorage().findOne(elementId);
+  }
 
-  @Deprecated
-  public boolean addDataLake(String measure, EventSchema eventSchema) {
+  @Override
+  public DataLakeMeasure createMeasurement(DataLakeMeasure measure) {
     List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
     Optional<DataLakeMeasure> optional =
-        dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
+        dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
+            .findFirst();
 
     if (optional.isPresent()) {
-      if (!compareEventProperties(optional.get().getEventSchema().getEventProperties(),
-          eventSchema.getEventProperties())) {
-        return false;
+      DataLakeMeasure oldEntry = optional.get();
+      if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
+          measure.getEventSchema().getEventProperties())) {
+        return oldEntry;
+      }
+    } else {
+      measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+      getDataLakeStorage().storeDataLakeMeasure(measure);
+      return measure;
+    }
+
+    return measure;
+  }
+
+  @Override
+  public void deleteMeasurement(String elementId) {
+    if (getDataLakeStorage().findOne(elementId) != null) {
+      getDataLakeStorage().deleteDataLakeMeasure(elementId);
+    } else {
+      throw new IllegalArgumentException("Could not find measure with this ID");
+    }
+  }
+
+  @Override
+  public boolean deleteMeasurementByName(String measureName) {
+    boolean isSuccess = false;
+    CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+    List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+    for (JsonObject document : docs) {
+      if (document.get("measureName").toString().replace("\"", "").equals(measureName)) {
+        couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
+            document.get("_rev").toString().replace("\"", ""));
+        isSuccess = true;
+        break;
       }
+    }
+
+    try {
+      couchDbClient.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return isSuccess;
+  }
+
+  @Override
+  public void updateMeasurement(DataLakeMeasure measure) {
+    var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+    if (existingMeasure != null) {
+      measure.setRev(existingMeasure.getRev());
+      getDataLakeStorage().updateDataLakeMeasure(measure);
     } else {
-      DataLakeMeasure dataLakeMeasure = new DataLakeMeasure(measure, eventSchema);
-      dataLakeMeasure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
-      getDataLakeStorage().storeDataLakeMeasure(dataLakeMeasure);
+      getDataLakeStorage().storeDataLakeMeasure(measure);
     }
-    return true;
+  }
+
+  private IDataLakeStorage getDataLakeStorage() {
+    return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
   }
 
   private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
@@ -89,8 +152,4 @@ public class DataLakeNoUserManagementV3 {
 
     });
   }
-
-  private IDataLakeStorage getDataLakeStorage() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
-  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
deleted file mode 100644
index 7f397af7a..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer;
-
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
-import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
-import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
-import org.apache.streampipes.dataexplorer.v4.query.QueryResultProvider;
-import org.apache.streampipes.dataexplorer.v4.query.StreamedQueryResultProvider;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
-import org.apache.streampipes.model.datalake.DataLakeMeasure;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.storage.api.IDataLakeStorage;
-import org.apache.streampipes.storage.couchdb.utils.Utils;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import com.google.gson.JsonObject;
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-import org.lightcouch.CouchDbClient;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-public class DataLakeManagementV4 {
-
-  public List<DataLakeMeasure> getAllMeasurements() {
-    return DataExplorerUtils.getInfos();
-  }
-
-  public DataLakeMeasure getById(String measureId) {
-    return getDataLakeStorage().findOne(measureId);
-  }
-
-  public SpQueryResult getData(ProvidedQueryParams queryParams,
-                               boolean ignoreMissingData) throws IllegalArgumentException {
-    return new QueryResultProvider(queryParams, ignoreMissingData).getData();
-  }
-
-  public void getDataAsStream(ProvidedQueryParams params,
-                              OutputFormat format,
-                              boolean ignoreMissingValues,
-                              OutputStream outputStream) throws IOException {
-
-    new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
-  }
-
-  public boolean removeAllMeasurements() {
-    List<DataLakeMeasure> allMeasurements = getAllMeasurements();
-
-    for (DataLakeMeasure measure : allMeasurements) {
-      QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
-      if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  public boolean removeMeasurement(String measurementID) {
-    List<DataLakeMeasure> allMeasurements = getAllMeasurements();
-    for (DataLakeMeasure measure : allMeasurements) {
-      if (measure.getMeasureName().equals(measurementID)) {
-        QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
-
-        return !queryResult.hasError();
-      }
-    }
-    return false;
-  }
-
-  public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
-    Map<String, QueryParamsV4> queryParts =
-        DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
-    return new DataExplorerQueryV4(queryParts).executeQuery(true);
-  }
-
-  public boolean removeEventProperty(String measurementID) {
-    boolean isSuccess = false;
-    CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
-    List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
-
-    for (JsonObject document : docs) {
-      if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
-        couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
-            document.get("_rev").toString().replace("\"", ""));
-        isSuccess = true;
-        break;
-      }
-    }
-
-    try {
-      couchDbClient.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    return isSuccess;
-  }
-
-  public Map<String, Object> getTagValues(String measurementId,
-                                          String fields) {
-    InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
-    String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
-    Map<String, Object> tags = new HashMap<>();
-    if (fields != null && !("".equals(fields))) {
-      List<String> fieldList = Arrays.asList(fields.split(","));
-      fieldList.forEach(f -> {
-        String q =
-            "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
-                + "\" WITH KEY = \"" + f + "\"";
-        Query query = new Query(q);
-        QueryResult queryResult = influxDB.query(query);
-        queryResult.getResults().forEach(res -> {
-          res.getSeries().forEach(series -> {
-            if (series.getValues().size() > 0) {
-              String field = series.getValues().get(0).get(0).toString();
-              List<String> values =
-                  series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
-              tags.put(field, values);
-            }
-          });
-        });
-      });
-    }
-
-    return tags;
-  }
-
-  public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
-    var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
-    if (existingMeasure != null) {
-      measure.setRev(existingMeasure.getRev());
-      getDataLakeStorage().updateDataLakeMeasure(measure);
-    } else {
-      getDataLakeStorage().storeDataLakeMeasure(measure);
-    }
-  }
-
-  public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
-    if (getDataLakeStorage().findOne(elementId) != null) {
-      getDataLakeStorage().deleteDataLakeMeasure(elementId);
-    } else {
-      throw new IllegalArgumentException("Could not find measure with this ID");
-    }
-  }
-
-  public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
-    List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
-    Optional<DataLakeMeasure> optional =
-        dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
-            .findFirst();
-
-    if (optional.isPresent()) {
-      DataLakeMeasure oldEntry = optional.get();
-      if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
-          measure.getEventSchema().getEventProperties())) {
-        return oldEntry;
-      }
-    } else {
-      measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
-      getDataLakeStorage().storeDataLakeMeasure(measure);
-      return measure;
-    }
-
-    return measure;
-  }
-
-  private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
-    if (prop1.size() != prop2.size()) {
-      return false;
-    }
-
-    return prop1.stream().allMatch(prop -> {
-
-      for (EventProperty property : prop2) {
-        if (prop.getRuntimeName().equals(property.getRuntimeName())) {
-
-          //primitive
-          if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
-            if (((EventPropertyPrimitive) prop)
-                .getRuntimeType()
-                .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
-              return true;
-            }
-
-            //list
-          } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
-            return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
-                Collections.singletonList(((EventPropertyList) property).getEventProperty()));
-
-            //nested
-          } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
-            return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
-                ((EventPropertyNested) property).getEventProperties());
-          }
-        }
-      }
-      return false;
-
-    });
-  }
-
-
-  private IDataLakeStorage getDataLakeStorage() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
-  }
-
-  private Environment getEnvironment() {
-    return Environments.getEnvironment();
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
new file mode 100644
index 000000000..59620a21b
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.api;
+
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface IDataExplorerQueryManagement {
+
+  SpQueryResult getData(ProvidedRestQueryParams queryParams,
+                        boolean ignoreMissingData) throws IllegalArgumentException;
+
+  void getDataAsStream(ProvidedRestQueryParams params,
+                       OutputFormat format,
+                       boolean ignoreMissingValues,
+                       OutputStream outputStream) throws IOException;
+
+  boolean deleteData(String measurementID);
+
+  SpQueryResult deleteData(String measurementID, Long startDate, Long endDate);
+
+  boolean deleteAllData();
+
+  Map<String, Object> getTagValues(String measurementId,
+                                   String fields);
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
similarity index 62%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
index 4ea28af5d..d8dc29648 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
@@ -16,20 +16,23 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query.elements;
+package org.apache.streampipes.dataexplorer.api;
 
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
 
-public abstract class QueryElement<T extends QueryParamsV4> {
-  protected String queryStatement;
+import java.util.List;
 
-  public QueryElement(T params) {
-    this.queryStatement = buildStatement(params);
-  }
+public interface IDataExplorerSchemaManagement {
 
-  protected abstract String buildStatement(T params);
+  List<DataLakeMeasure> getAllMeasurements();
 
-  public String getStatement() {
-    return queryStatement;
-  }
+  DataLakeMeasure getById(String elementId);
+
+  DataLakeMeasure createMeasurement(DataLakeMeasure measure);
+
+  void deleteMeasurement(String elementId);
+
+  boolean deleteMeasurementByName(String measureName);
+
+  void updateMeasurement(DataLakeMeasure measure);
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IQueryStatement.java
similarity index 76%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IQueryStatement.java
index 49046beec..1e1a0f39a 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IQueryStatement.java
@@ -15,18 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4.params;
 
-public abstract class QueryParamsV4 {
+package org.apache.streampipes.dataexplorer.api;
 
-  private final String index;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
 
-  protected QueryParamsV4(String index) {
-    this.index = index;
-  }
-
-  public String getIndex() {
-    return index;
-  }
+public interface IQueryStatement {
 
+  void buildStatement(IDataLakeQueryBuilder<?> builder);
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
new file mode 100644
index 000000000..d0bbdc09f
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.influx;
+
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
+import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
+import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
+import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {
+
+  public DataExplorerInfluxQueryExecutor() {
+    super();
+  }
+
+  public DataExplorerInfluxQueryExecutor(String forId) {
+    super(forId);
+  }
+
+  public DataExplorerInfluxQueryExecutor(int maximumAmountOfEvents) {
+    super(maximumAmountOfEvents);
+  }
+
+
+  @Override
+  protected double getAmountOfResults(QueryResult countQueryResult) {
+    if (countQueryResult.getResults().get(0).getSeries() != null
+        && countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
+      return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
+    } else {
+      return 0.0;
+    }
+  }
+
+
+  protected DataSeries convertResult(QueryResult.Series series,
+                                     boolean ignoreMissingValues) {
+    List<String> columns = series.getColumns();
+    List<List<Object>> values = series.getValues();
+
+    List<List<Object>> resultingValues = new ArrayList<>();
+
+    values.forEach(v -> {
+      if (ignoreMissingValues) {
+        if (!v.contains(null)) {
+          resultingValues.add(v);
+        }
+      } else {
+        resultingValues.add(v);
+      }
+
+    });
+
+    return new DataSeries(values.size(), resultingValues, columns, series.getTags());
+  }
+
+  protected SpQueryResult postQuery(QueryResult queryResult,
+                                    boolean ignoreMissingValues) throws RuntimeException {
+    SpQueryResult result = new SpQueryResult();
+
+    if (hasResult(queryResult)) {
+      result.setTotal(queryResult.getResults().get(0).getSeries().size());
+      queryResult.getResults().get(0).getSeries().forEach(rs -> {
+        DataSeries series = convertResult(rs, ignoreMissingValues);
+        result.setHeaders(series.getHeaders());
+        result.addDataResult(series);
+      });
+    }
+
+    if (this.appendId) {
+      result.setForId(this.forId);
+    }
+
+    return result;
+  }
+
+  private IDataLakeQueryBuilder<Query> getQueryBuilder(String measurementId) {
+    return DataLakeInfluxQueryBuilder.create(measurementId);
+  }
+
+  @Override
+  protected QueryResult executeQuery(Query query) {
+    try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
+      return influxDB.query(query);
+    }
+  }
+
+  @Override
+  protected String asQueryString(Query query) {
+    return "(database:" + query.getDatabase() + "): " + query.getCommand();
+  }
+
+  @Override
+  protected Query makeDeleteQuery(DeleteQueryParams params) {
+    String query = "DELETE FROM \"" + params.getMeasurementId() + "\"";
+    if (params.isTimeRestricted()) {
+      query += "WHERE time > "
+          + params.getStartTime() * 1000000
+          + " AND time < "
+          + params.getEndTime() * 1000000;
+    }
+    return new Query(query);
+  }
+
+  @Override
+  protected Query makeCountQuery(SelectQueryParams params) {
+    var builder = getQueryBuilder(params.getIndex());
+    return params.toCountQuery(builder);
+  }
+
+  @Override
+  protected Query makeSelectQuery(SelectQueryParams params) {
+    var builder = getQueryBuilder(params.getIndex());
+    return params.toQuery(builder);
+  }
+
+  private boolean hasResult(QueryResult queryResult) {
+    return queryResult.getResults() != null
+        && queryResult.getResults().size() > 0
+        && queryResult.getResults().get(0).getSeries() != null;
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeInfluxQueryBuilder.java
similarity index 55%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeInfluxQueryBuilder.java
index 42f5126e7..d2f518246 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeInfluxQueryBuilder.java
@@ -16,11 +16,14 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.sdk;
+package org.apache.streampipes.dataexplorer.influx;
 
 import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction;
+import org.apache.streampipes.dataexplorer.param.model.AggregationFunction;
+import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering;
+import org.apache.streampipes.dataexplorer.querybuilder.FilterCondition;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
 
 import org.influxdb.dto.Query;
 import org.influxdb.querybuilder.Ordering;
@@ -35,12 +38,13 @@ import org.influxdb.querybuilder.clauses.SimpleClause;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.asc;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.desc;
 import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
 
-public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
+public class DataLakeInfluxQueryBuilder implements IDataLakeQueryBuilder<Query> {
 
   private final String measurementId;
   private final SelectionQueryImpl selectionQuery;
@@ -50,9 +54,11 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   private int limit = Integer.MIN_VALUE;
   private int offset = Integer.MIN_VALUE;
 
-  private Environment env;
+  private Object fill;
 
-  private DataLakeQueryBuilder(String measurementId) {
+  private final Environment env;
+
+  private DataLakeInfluxQueryBuilder(String measurementId) {
     this.measurementId = measurementId;
     this.selectionQuery = select();
     this.whereClauses = new ArrayList<>();
@@ -60,70 +66,70 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
     this.env = Environments.getEnvironment();
   }
 
-  public static DataLakeQueryBuilder create(String measurementId) {
-    return new DataLakeQueryBuilder(measurementId);
+  public static DataLakeInfluxQueryBuilder create(String measurementId) {
+    return new DataLakeInfluxQueryBuilder(measurementId);
   }
 
+
   @Override
-  public DataLakeQueryBuilder withSimpleColumn(String columnName) {
-    this.selectionQuery.column(columnName);
+  public DataLakeInfluxQueryBuilder withAllColumns() {
+    this.selectionQuery.all();
+    return this;
+  }
 
+  @Override
+  public DataLakeInfluxQueryBuilder withSimpleColumn(String columnName) {
+    this.selectionQuery.column(columnName);
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withSimpleColumns(List<String> columnNames) {
+  public DataLakeInfluxQueryBuilder withSimpleColumns(List<String> columnNames) {
     columnNames.forEach(this.selectionQuery::column);
 
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withAggregatedColumn(String columnName,
-                                                   ColumnFunction columnFunction,
-                                                   String targetName) {
-    if (columnFunction == ColumnFunction.COUNT) {
-      this.selectionQuery.count(columnName).as(targetName);
-    } else if (columnFunction == ColumnFunction.MEAN) {
-      this.selectionQuery.mean(columnName).as(targetName);
-    } else if (columnFunction == ColumnFunction.MIN) {
-      this.selectionQuery.min(columnName).as(targetName);
-    } else if (columnFunction == ColumnFunction.MAX) {
-      this.selectionQuery.max(columnName).as(targetName);
-    } else if (columnFunction == ColumnFunction.FIRST) {
-      this.selectionQuery.function("FIRST", columnName).as(targetName);
-    } else if (columnFunction == ColumnFunction.LAST) {
-      this.selectionQuery.function("LAST", columnName).as(targetName);
-    }
+  public DataLakeInfluxQueryBuilder withAggregatedColumn(String columnName,
+                                                         AggregationFunction aggregationFunction,
+                                                         String aliasName) {
 
-    // TODO implement all column functions
+    this.selectionQuery.function(aggregationFunction.toDbName(), columnName).as(aliasName);
 
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withStartTime(long startTime) {
+  public IDataLakeQueryBuilder<Query> withAggregatedColumn(String columnName, AggregationFunction aggregationFunction) {
+    this.selectionQuery.function(aggregationFunction.toDbName(), columnName);
+
+    return this;
+  }
+
+  @Override
+  public DataLakeInfluxQueryBuilder withStartTime(long startTime) {
     this.whereClauses.add(new SimpleClause("time", ">=", startTime * 1000000));
     return this;
   }
 
 
   @Override
-  public DataLakeQueryBuilder withEndTime(long endTime) {
+  public DataLakeInfluxQueryBuilder withEndTime(long endTime) {
     return withEndTime(endTime, true);
   }
 
   @Override
-  public DataLakeQueryBuilder withEndTime(long endTime,
-                                          boolean includeEndTime) {
+  public DataLakeInfluxQueryBuilder withEndTime(long endTime,
+                                                boolean includeEndTime) {
     String operator = includeEndTime ? "<=" : "<";
     this.whereClauses.add(new SimpleClause("time", operator, endTime * 1000000));
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withTimeBoundary(long startTime,
-                                               long endTime) {
+  public DataLakeInfluxQueryBuilder withTimeBoundary(long startTime,
+                                                     long endTime) {
     this.withStartTime(startTime);
     this.withEndTime(endTime);
 
@@ -131,17 +137,17 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withFilter(String field,
-                                         String operator,
-                                         Object value) {
+  public DataLakeInfluxQueryBuilder withFilter(String field,
+                                               String operator,
+                                               Object value) {
     this.whereClauses.add(new SimpleClause(field, operator, value));
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withExclusiveFilter(String field,
-                                                  String operator,
-                                                  List<?> values) {
+  public DataLakeInfluxQueryBuilder withExclusiveFilter(String field,
+                                                        String operator,
+                                                        List<?> values) {
     List<ConjunctionClause> or = new ArrayList<>();
     values.forEach(value -> or.add(new OrConjunction(new SimpleClause(field, operator, value))));
 
@@ -150,9 +156,9 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withInclusiveFilter(String field,
-                                                  String operator,
-                                                  List<?> values) {
+  public DataLakeInfluxQueryBuilder withInclusiveFilter(String field,
+                                                        String operator,
+                                                        List<?> values) {
     List<ConjunctionClause> and = new ArrayList<>();
     values.forEach(value -> and.add(new AndConjunction(new SimpleClause(field, operator, value))));
 
@@ -161,14 +167,25 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withFilter(NestedClause clause) {
+  public IDataLakeQueryBuilder<Query> withInclusiveFilter(List<FilterCondition> filterConditions) {
+    List<ConjunctionClause> and = new ArrayList<>();
+    filterConditions.forEach(c -> and
+        .add(new AndConjunction(new SimpleClause(c.getField(), c.getOperator(), c.getCondition()))));
+
+    addNestedWhereClause(and);
+
+    return this;
+  }
+
+  @Override
+  public DataLakeInfluxQueryBuilder withFilter(NestedClause clause) {
     this.whereClauses.add(clause);
 
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withGroupByTime(String timeInterval) {
+  public DataLakeInfluxQueryBuilder withGroupByTime(String timeInterval) {
 
     this.groupByClauses.add(new RawTextClause("time(" + timeInterval + ")"));
 
@@ -176,8 +193,8 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withGroupByTime(String timeInterval,
-                                              String offsetInterval) {
+  public DataLakeInfluxQueryBuilder withGroupByTime(String timeInterval,
+                                                    String offsetInterval) {
 
     this.groupByClauses.add(new RawTextClause("time("
         + timeInterval
@@ -189,7 +206,7 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withGroupBy(String column) {
+  public DataLakeInfluxQueryBuilder withGroupBy(String column) {
 
     this.groupByClauses.add(new RawTextClause(column));
 
@@ -197,7 +214,7 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withOrderBy(DataLakeQueryOrdering ordering) {
+  public DataLakeInfluxQueryBuilder withOrderBy(DataLakeQueryOrdering ordering) {
     if (DataLakeQueryOrdering.ASC.equals(ordering)) {
       this.ordering = asc();
     } else {
@@ -208,23 +225,30 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
   }
 
   @Override
-  public DataLakeQueryBuilder withLimit(int limit) {
+  public DataLakeInfluxQueryBuilder withLimit(int limit) {
     this.limit = limit;
 
     return this;
   }
 
   @Override
-  public DataLakeQueryBuilder withOffset(int offset) {
+  public DataLakeInfluxQueryBuilder withOffset(int offset) {
     this.offset = offset;
 
     return this;
   }
 
+  @Override
+  public IDataLakeQueryBuilder<Query> withFill(Object fill) {
+    this.fill = fill;
+
+    return this;
+  }
+
   @Override
   public Query build() {
     var selectQuery =
-        this.selectionQuery.from(env.getTsStorageBucket().getValueOrDefault(), "\"" + measurementId + "\"");
+        this.selectionQuery.from(env.getTsStorageBucket().getValueOrDefault(), escapeIndex(measurementId));
     this.whereClauses.forEach(selectQuery::where);
 
     if (this.groupByClauses.size() > 0) {
@@ -243,6 +267,14 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
       selectQuery.limit(this.limit, this.offset);
     }
 
+    if (Objects.nonNull(fill)) {
+      if (fill instanceof String) {
+        selectQuery.fill((String) fill);
+      } else {
+        selectQuery.fill((Number) fill);
+      }
+    }
+
     return selectQuery;
   }
 
@@ -250,4 +282,8 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
     NestedClause nestedClause = new NestedClause(clauses);
     this.whereClauses.add(nestedClause);
   }
+
+  private String escapeIndex(String index) {
+    return "\"" + index + "\"";
+  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/DeleteQueryParams.java
similarity index 50%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/DeleteQueryParams.java
index 64c0a4867..8bdda43de 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/DeleteQueryParams.java
@@ -16,28 +16,45 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param;
 
-public class TimeBoundaryParams extends QueryParamsV4 {
+public class DeleteQueryParams {
 
-  private final Long startDate;
-  private final Long endDate;
+  private final String measurementId;
 
-  protected TimeBoundaryParams(String measurementID, Long startDate, Long endDate) {
-    super(measurementID);
-    this.startDate = startDate;
-    this.endDate = endDate;
+  private long startTime;
+
+  private long endTime;
+
+  private boolean timeRestricted;
+
+  public DeleteQueryParams(String measurementId) {
+    this.measurementId = measurementId;
+    this.timeRestricted = false;
+  }
+
+  public DeleteQueryParams(String measurementId,
+                           Long startTime,
+                           Long endTime) {
+    this(measurementId);
+    this.startTime = startTime;
+    this.endTime = endTime;
+    this.timeRestricted = true;
+  }
+
+  public String getMeasurementId() {
+    return measurementId;
   }
 
-  public static TimeBoundaryParams from(String measurementID, Long startDate, Long endDate) {
-    return new TimeBoundaryParams(measurementID, startDate, endDate);
+  public long getStartTime() {
+    return startTime;
   }
 
-  public Long getStartDate() {
-    return startDate;
+  public long getEndTime() {
+    return endTime;
   }
 
-  public Long getEndDate() {
-    return endDate;
+  public boolean isTimeRestricted() {
+    return timeRestricted;
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParamConverter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParamConverter.java
new file mode 100644
index 000000000..963ec9d32
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParamConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.param.model.FillClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTagsClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTimeClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.ItemClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OffsetClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OrderByClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.SelectClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.WhereClauseParams;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_END_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_FILTER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_GROUP_BY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_OFFSET;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_START_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_TIME_INTERVAL;
+
+
+public class ProvidedRestQueryParamConverter {
+
+  public static final String BRACKET_OPEN = "\\[";
+  public static final String BRACKET_CLOSE = "\\]";
+
+  public static final String ORDER_DESCENDING = "DESC";
+
+  public static SelectQueryParams getSelectQueryParams(ProvidedRestQueryParams params) {
+    SelectQueryParams queryParameters = new SelectQueryParams(params.getMeasurementId());
+
+    String measurementId = params.getMeasurementId();
+
+    if (params.has(QP_COUNT_ONLY) && params.getAsBoolean(QP_COUNT_ONLY)) {
+      queryParameters.withSelectParams(SelectClauseParams.from(params.getAsString(QP_COLUMNS), true));
+    } else {
+      queryParameters.withSelectParams(SelectClauseParams.from(params.getAsString(QP_COLUMNS),
+          params.getAsString(QP_AGGREGATION_FUNCTION)));
+    }
+
+    String filterConditions = params.getAsString(QP_FILTER);
+
+    if (hasTimeParams(params)) {
+      queryParameters.withWhereParams(WhereClauseParams.from(
+          params.getAsLong(QP_START_DATE),
+          params.getAsLong(QP_END_DATE),
+          filterConditions));
+    } else if (filterConditions != null) {
+      queryParameters.withWhereParams(WhereClauseParams.from(filterConditions));
+    }
+
+    if (params.has(QP_TIME_INTERVAL)) {
+      String timeInterval = params.getAsString(QP_TIME_INTERVAL);
+      if (!params.has(QP_GROUP_BY)) {
+        queryParameters.withGroupByTimeParams(GroupByTimeClauseParams.from(timeInterval));
+      } else {
+        params.update(QP_GROUP_BY, params.getAsString(QP_GROUP_BY) + ",time(" + timeInterval + ")");
+      }
+
+      queryParameters.withFillParams(FillClauseParams.from());
+    }
+
+    if (params.has(QP_GROUP_BY)) {
+      queryParameters.withGroupByTagsParams(GroupByTagsClauseParams.from(params.getAsString(QP_GROUP_BY)));
+    }
+
+
+    if (params.has(QP_ORDER)) {
+      String order = params.getAsString(QP_ORDER);
+      if (order.equals(ORDER_DESCENDING)) {
+        queryParameters.withOrderByParams(OrderByClauseParams.from(order));
+      }
+    }
+
+    if (params.has(QP_LIMIT)) {
+      queryParameters.withLimitParams(ItemClauseParams.from(params.getAsInt(QP_LIMIT)));
+    }
+
+    if (params.has(QP_OFFSET)) {
+      queryParameters.withOffsetParams(OffsetClauseParams.from(params.getAsInt(QP_OFFSET)));
+    } else if (params.has(QP_LIMIT) && params.has(QP_PAGE)) {
+      queryParameters.withOffsetParams(OffsetClauseParams.from(
+          params.getAsInt(QP_PAGE) * params.getAsInt(QP_LIMIT)));
+    }
+
+    return queryParameters;
+  }
+
+  public static DeleteQueryParams getDeleteQueryParams(String measurementId,
+                                                                    Long startTime,
+                                                                    Long endTime) {
+    if (startTime != null || endTime != null) {
+      return new DeleteQueryParams(measurementId, startTime, endTime);
+    } else {
+      return new DeleteQueryParams(measurementId);
+    }
+  }
+
+  private static boolean hasTimeParams(ProvidedRestQueryParams params) {
+    return params.has(QP_START_DATE)
+        || params.has(QP_END_DATE);
+  }
+
+  public static List<String[]> buildConditions(String queryPart) {
+    String[] conditions = queryPart.split(",");
+    List<String[]> result = new ArrayList<>();
+
+    Arrays.stream(conditions).forEach(condition -> {
+      String[] singleCondition = buildSingleCondition(condition);
+      result.add(singleCondition);
+    });
+    return result;
+  }
+
+  public static String[] buildSingleCondition(String queryPart) {
+    return queryPart
+        .replaceAll(BRACKET_OPEN, "")
+        .replaceAll(BRACKET_CLOSE, "")
+        .split(";");
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
similarity index 86%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
index e3c9535d6..c783e474d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
@@ -15,26 +15,26 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4;
+package org.apache.streampipes.dataexplorer.param;
 
 import java.util.HashMap;
 import java.util.Map;
 
-public class ProvidedQueryParams {
+public class ProvidedRestQueryParams {
 
   private final String measurementId;
   private final Map<String, String> providedParams;
 
-  public ProvidedQueryParams(String measurementId,
-                             Map<String, String> providedParams) {
+  public ProvidedRestQueryParams(String measurementId,
+                                 Map<String, String> providedParams) {
     this.measurementId = measurementId;
     this.providedParams = providedParams;
   }
 
-  public ProvidedQueryParams(ProvidedQueryParams params) {
+  public ProvidedRestQueryParams(ProvidedRestQueryParams params) {
     this.measurementId = params.getMeasurementId();
     this.providedParams = new HashMap<>();
-    params.getProvidedParams().forEach(providedParams::put);
+    providedParams.putAll(params.getProvidedParams());
   }
 
   public boolean has(String key) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SelectQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SelectQueryParams.java
new file mode 100644
index 000000000..741365060
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SelectQueryParams.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.param.model.FillClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTagsClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTimeClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.ItemClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OffsetClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OrderByClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.SelectClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.WhereClauseParams;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+import java.util.Objects;
+
+public class SelectQueryParams {
+
+  private SelectClauseParams selectParams;
+
+  private WhereClauseParams whereParams;
+
+  private GroupByTagsClauseParams groupByTagsClauseParams;
+  private GroupByTimeClauseParams groupByTimeClauseParams;
+
+  private OrderByClauseParams orderByClauseParams;
+  private ItemClauseParams limitParams;
+
+  private OffsetClauseParams offsetClauseParams;
+
+  private FillClauseParams fillClauseParams;
+
+  private final String index;
+
+  public SelectQueryParams(String index) {
+    this.index = index;
+  }
+
+  public String getIndex() {
+    return index;
+  }
+
+  public void withSelectParams(SelectClauseParams params) {
+    this.selectParams = params;
+  }
+
+  public void withLimitParams(ItemClauseParams params) {
+    this.limitParams = params;
+  }
+
+  public void withGroupByTagsParams(GroupByTagsClauseParams params) {
+    this.groupByTagsClauseParams = params;
+  }
+
+  public void withGroupByTimeParams(GroupByTimeClauseParams params) {
+    this.groupByTimeClauseParams = params;
+  }
+
+  public void withFillParams(FillClauseParams params) {
+    this.fillClauseParams = params;
+  }
+
+  public void withWhereParams(WhereClauseParams params) {
+    this.whereParams = params;
+  }
+
+  public void withOffsetParams(OffsetClauseParams params) {
+    this.offsetClauseParams = params;
+  }
+
+  public void withOrderByParams(OrderByClauseParams params) {
+    this.orderByClauseParams = params;
+  }
+
+  public <T> T toQuery(IDataLakeQueryBuilder<T> builder) {
+    this.selectParams.buildStatement(builder);
+    prepareBuilder(builder);
+    return builder.build();
+  }
+
+  public <T> T toCountQuery(IDataLakeQueryBuilder<T> builder) {
+    this.selectParams.buildCountStatement(builder);
+    prepareBuilder(builder);
+    return builder.build();
+  }
+
+  private <T> void prepareBuilder(IDataLakeQueryBuilder<T> builder) {
+    if (Objects.nonNull(this.whereParams)) {
+      this.whereParams.buildStatement(builder);
+    }
+
+    if (Objects.nonNull(this.groupByTimeClauseParams)) {
+      this.groupByTimeClauseParams.buildStatement(builder);
+    }
+
+    if (Objects.nonNull(this.groupByTagsClauseParams)) {
+      this.groupByTagsClauseParams.buildStatement(builder);
+    }
+
+    if (Objects.nonNull(this.orderByClauseParams)) {
+      this.orderByClauseParams.buildStatement(builder);
+    }
+
+    if (Objects.nonNull(this.limitParams)) {
+      this.limitParams.buildStatement(builder);
+    }
+
+    if (Objects.nonNull(this.offsetClauseParams)) {
+      this.offsetClauseParams.buildStatement(builder);
+    }
+
+    if (Objects.nonNull(this.fillClauseParams)) {
+      this.fillClauseParams.buildStatement(builder);
+    }
+  }
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SupportedRestQueryParams.java
similarity index 96%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SupportedRestQueryParams.java
index 059a4f4ce..7c3e3f829 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SupportedRestQueryParams.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4;
+package org.apache.streampipes.dataexplorer.param;
 
 import java.util.Arrays;
 import java.util.List;
 
-public class SupportedDataLakeQueryParameters {
+public class SupportedRestQueryParams {
 
   public static final String QP_COLUMNS = "columns";
   public static final String QP_START_DATE = "startDate";
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ColumnFunction.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/AggregationFunction.java
similarity index 86%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ColumnFunction.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/AggregationFunction.java
index 1837af037..0d899c2b9 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ColumnFunction.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/AggregationFunction.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
-public enum ColumnFunction {
+public enum AggregationFunction {
 
   MEAN("MEAN"),
   MIN("MIN"),
@@ -28,9 +28,9 @@ public enum ColumnFunction {
   MODE("MODE"),
   SUM("SUM");
 
-  private String dbName;
+  private final String dbName;
 
-  ColumnFunction(String dbName) {
+  AggregationFunction(String dbName) {
     this.dbName = dbName;
   }
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/FillClauseParams.java
similarity index 62%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/FillClauseParams.java
index 15e93a7e8..5204ce5a0 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/FillClauseParams.java
@@ -16,17 +16,23 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query.elements;
+package org.apache.streampipes.dataexplorer.param.model;
 
-import org.apache.streampipes.dataexplorer.v4.params.FillParams;
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
 
-public class FillStatement extends QueryElement<FillParams> {
-  public FillStatement(FillParams fillParams) {
-    super(fillParams);
+public class FillClauseParams implements IQueryStatement {
+  String fill = "none";
+
+  protected FillClauseParams() {
+  }
+
+  public static FillClauseParams from() {
+    return new FillClauseParams();
   }
 
   @Override
-  protected String buildStatement(FillParams fillParams) {
-    return fillParams.getFill();
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    builder.withFill(fill);
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTagsClauseParams.java
similarity index 56%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTagsClauseParams.java
index afdc591a6..833fb3363 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTagsClauseParams.java
@@ -16,27 +16,29 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
+
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
-public class GroupingByTagsParams extends QueryParamsV4 {
+public class GroupByTagsClauseParams implements IQueryStatement {
   private final List<String> groupingTags;
 
-  public GroupingByTagsParams(String measurementID, String groupingTagsSeparatedByComma) {
-    super(measurementID);
+  public GroupByTagsClauseParams(String groupingTagsSeparatedByComma) {
     this.groupingTags = new ArrayList<>();
-    for (String tag : groupingTagsSeparatedByComma.split(",")) {
-      this.groupingTags.add(tag);
-    }
+    this.groupingTags.addAll(Arrays.asList(groupingTagsSeparatedByComma.split(",")));
   }
 
-  public static GroupingByTagsParams from(String measurementID, String groupingTagsSeparatedByComma) {
-    return new GroupingByTagsParams(measurementID, groupingTagsSeparatedByComma);
+  public static GroupByTagsClauseParams from(String groupingTagsSeparatedByComma) {
+    return new GroupByTagsClauseParams(groupingTagsSeparatedByComma);
   }
 
-  public List<String> getGroupingTags() {
-    return groupingTags;
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    groupingTags.forEach(builder::withGroupBy);
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTimeClauseParams.java
similarity index 61%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTimeClauseParams.java
index 27650dabb..0d83953ce 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTimeClauseParams.java
@@ -16,21 +16,24 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
-public class GroupingByTimeParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class GroupByTimeClauseParams implements IQueryStatement {
   private final String timeInterval;
 
-  public GroupingByTimeParams(String measurementID, String timeInterval) {
-    super(measurementID);
+  public GroupByTimeClauseParams(String timeInterval) {
     this.timeInterval = timeInterval;
   }
 
-  public static GroupingByTimeParams from(String measurementID, String timeInterval) {
-    return new GroupingByTimeParams(measurementID, timeInterval);
+  public static GroupByTimeClauseParams from(String timeInterval) {
+    return new GroupByTimeClauseParams(timeInterval);
   }
 
-  public String getTimeInterval() {
-    return this.timeInterval;
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    builder.withGroupByTime(timeInterval);
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/ItemClauseParams.java
similarity index 64%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/ItemClauseParams.java
index 0bd9dd122..ea447176c 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/ItemClauseParams.java
@@ -16,22 +16,29 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
-public class ItemLimitationParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class ItemClauseParams implements IQueryStatement {
 
   private final Integer limit;
 
-  public ItemLimitationParams(String measurementID, Integer limit) {
-    super(measurementID);
+  public ItemClauseParams(Integer limit) {
     this.limit = limit;
   }
 
-  public static ItemLimitationParams from(String measurementID, Integer limit) {
-    return new ItemLimitationParams(measurementID, limit);
+  public static ItemClauseParams from(Integer limit) {
+    return new ItemClauseParams(limit);
   }
 
   public Integer getLimit() {
     return limit;
   }
+
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    builder.withLimit(limit);
+  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OffsetClauseParams.java
similarity index 64%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OffsetClauseParams.java
index 334983af1..b8dbc94ae 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OffsetClauseParams.java
@@ -16,22 +16,29 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
-public class OffsetParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class OffsetClauseParams implements IQueryStatement {
 
   private final Integer offset;
 
-  public OffsetParams(String measurementID, Integer offset) {
-    super(measurementID);
+  public OffsetClauseParams(Integer offset) {
     this.offset = offset;
   }
 
-  public static OffsetParams from(String measurementID, Integer offset) {
-    return new OffsetParams(measurementID, offset);
+  public static OffsetClauseParams from(Integer offset) {
+    return new OffsetClauseParams(offset);
   }
 
   public Integer getOffset() {
     return offset;
   }
+
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    builder.withOffset(offset);
+  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OrderByClauseParams.java
similarity index 58%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OrderByClauseParams.java
index 2115e2f44..612a91674 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OrderByClauseParams.java
@@ -16,21 +16,25 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
-public class OrderingByTimeParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class OrderByClauseParams implements IQueryStatement {
   private final String ordering;
 
-  public OrderingByTimeParams(String measurementID, String ordering) {
-    super(measurementID);
+  public OrderByClauseParams(String ordering) {
     this.ordering = ordering;
   }
 
-  public static OrderingByTimeParams from(String measurementID, String ordering) {
-    return new OrderingByTimeParams(measurementID, ordering);
+  public static OrderByClauseParams from(String ordering) {
+    return new OrderByClauseParams(ordering);
   }
 
-  public String getOrdering() {
-    return this.ordering;
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    builder.withOrderBy(DataLakeQueryOrdering.valueOf(ordering));
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectClauseParams.java
similarity index 50%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectClauseParams.java
index ad8666296..6861b4915 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectClauseParams.java
@@ -16,69 +16,54 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
 
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
 import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class SelectFromStatementParams extends QueryParamsV4 {
+public class SelectClauseParams implements IQueryStatement {
 
   private List<SelectColumn> selectedColumns;
+  private List<SelectColumn> selectedColumnsCountOnly;
   private boolean selectWildcard = false;
 
-  public SelectFromStatementParams(String measurementID) {
-    super(measurementID);
+  public SelectClauseParams() {
     this.selectWildcard = true;
-    //this.selectedColumns = "*";
   }
 
-  public SelectFromStatementParams(String measurementId,
-                                   String columns,
-                                   boolean countOnly) {
-    this(measurementId);
+  public SelectClauseParams(String columns,
+                            boolean countOnly) {
     this.selectWildcard = false;
-    this.selectedColumns = countOnly ? buildColumns(columns, ColumnFunction.COUNT.name()) : buildColumns(columns);
+    this.selectedColumns = countOnly ? buildColumns(columns, AggregationFunction.COUNT.name()) : buildColumns(columns);
+    this.selectedColumnsCountOnly = buildColumns(columns, AggregationFunction.COUNT.name());
   }
 
-  public SelectFromStatementParams(String measurementID,
-                                   String columns,
-                                   String aggregationFunction) {
-    super(measurementID);
-
+  public SelectClauseParams(String columns,
+                            String aggregationFunction) {
     if (columns != null) {
       this.selectedColumns =
           aggregationFunction != null ? buildColumns(columns, aggregationFunction) : buildColumns(columns);
+      this.selectedColumnsCountOnly = buildColumns(columns, AggregationFunction.COUNT.name());
     } else {
       this.selectWildcard = true;
     }
   }
 
-  public static SelectFromStatementParams from(String measurementID,
-                                               @Nullable String columns,
-                                               @Nullable String aggregationFunction) {
-    return new SelectFromStatementParams(measurementID, columns, aggregationFunction);
-  }
-
-  public static SelectFromStatementParams from(String measurementId,
-                                               String columns,
-                                               boolean countOnly) {
-    return new SelectFromStatementParams(measurementId, columns, countOnly);
+  public static SelectClauseParams from(@Nullable String columns,
+                                        @Nullable String aggregationFunction) {
+    return new SelectClauseParams(columns, aggregationFunction);
   }
 
-  public List<SelectColumn> getSelectedColumns() {
-    return selectedColumns;
-  }
-
-  //public String getAggregationFunction() {
-  //return aggregationFunction;
-  //}
-
-  public boolean isSelectWildcard() {
-    return selectWildcard;
+  public static SelectClauseParams from(String columns,
+                                        boolean countOnly) {
+    return new SelectClauseParams(columns, countOnly);
   }
 
   private List<SelectColumn> buildColumns(String rawQuery) {
@@ -89,4 +74,22 @@ public class SelectFromStatementParams extends QueryParamsV4 {
     return Arrays.stream(rawQuery.split(",")).map(qp -> SelectColumn.fromApiQueryString(qp, globalAggregationFunction))
         .collect(Collectors.toList());
   }
+
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    buildStatement(builder, selectedColumns);
+  }
+
+  public void buildCountStatement(IDataLakeQueryBuilder<?> builder) {
+    buildStatement(builder, selectedColumnsCountOnly);
+  }
+
+  private void buildStatement(IDataLakeQueryBuilder<?> builder,
+                              List<SelectColumn> columns) {
+    if (selectWildcard) {
+      builder.withAllColumns();
+    } else {
+      columns.forEach(c -> c.buildStatement(builder));
+    }
+  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
similarity index 53%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
index 667bf943d..fe7be2da4 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
@@ -15,14 +15,16 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
 
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
 
-public class SelectColumn {
+public class SelectColumn implements IQueryStatement {
 
   private final String originalField;
-  private ColumnFunction columnFunction;
+  private AggregationFunction aggregationFunction;
   private String targetField;
 
   private boolean simpleField = true;
@@ -33,30 +35,30 @@ public class SelectColumn {
   }
 
   public SelectColumn(String originalField,
-                      ColumnFunction columnFunction) {
+                      AggregationFunction aggregationFunction) {
     this(originalField);
-    this.columnFunction = columnFunction;
+    this.aggregationFunction = aggregationFunction;
     this.simpleField = false;
   }
 
   public SelectColumn(String originalField,
-                      ColumnFunction columnFunction,
+                      AggregationFunction aggregationFunction,
                       String targetField) {
-    this(originalField, columnFunction);
+    this(originalField, aggregationFunction);
     this.targetField = targetField;
     this.rename = true;
   }
 
   public static SelectColumn fromApiQueryString(String queryString) {
     if (queryString.contains(";")) {
-      String[] queryParts = DataLakeManagementUtils.buildSingleCondition(queryString);
+      String[] queryParts = ProvidedRestQueryParamConverter.buildSingleCondition(queryString);
       if (queryParts.length < 2) {
         throw new IllegalArgumentException("Wrong query format for query part " + queryString);
       } else {
-        ColumnFunction columnFunction = ColumnFunction.valueOf(queryParts[1]);
+        AggregationFunction aggregationFunction = AggregationFunction.valueOf(queryParts[1]);
         String targetField =
-            queryParts.length == 3 ? queryParts[2] : columnFunction.name().toLowerCase() + "_" + queryParts[0];
-        return new SelectColumn(queryParts[0], columnFunction, targetField);
+            queryParts.length == 3 ? queryParts[2] : aggregationFunction.name().toLowerCase() + "_" + queryParts[0];
+        return new SelectColumn(queryParts[0], aggregationFunction, targetField);
       }
     } else {
       return new SelectColumn(queryString);
@@ -65,29 +67,25 @@ public class SelectColumn {
 
   public static SelectColumn fromApiQueryString(String queryString,
                                                 String globalAggregationFunction) {
-    ColumnFunction columnFunction = ColumnFunction.valueOf(globalAggregationFunction);
-    String targetField = columnFunction.name().toLowerCase() + "_" + queryString;
-    return new SelectColumn(queryString, ColumnFunction.valueOf(globalAggregationFunction), targetField);
+    AggregationFunction aggregationFunction = AggregationFunction.valueOf(globalAggregationFunction);
+    String targetField = aggregationFunction.name().toLowerCase() + "_" + queryString;
+    return new SelectColumn(queryString, AggregationFunction.valueOf(globalAggregationFunction), targetField);
   }
 
-  private String makeField() {
-    if (this.simpleField) {
-      return "\"" + this.originalField + "\"";
-    } else {
-      return this.columnFunction.toDbName() + "(\"" + this.originalField + "\")";
-    }
+  public String getOriginalField() {
+    return originalField;
   }
 
-  public String toQueryString() {
-    String field = makeField();
-    if (this.rename) {
-      return field + " AS \"" + this.targetField + "\"";
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    if (this.simpleField) {
+      builder.withSimpleColumn(this.originalField);
     } else {
-      return field;
+      if (this.rename) {
+        builder.withAggregatedColumn(this.originalField, this.aggregationFunction, this.targetField);
+      } else {
+        builder.withAggregatedColumn(this.originalField, this.aggregationFunction);
+      }
     }
   }
-
-  public String getOriginalField() {
-    return originalField;
-  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/WhereClauseParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/WhereClauseParams.java
new file mode 100644
index 000000000..ab5a6712b
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/WhereClauseParams.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.dataexplorer.param.model;
+
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.querybuilder.FilterCondition;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+import org.apache.commons.lang3.math.NumberUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WhereClauseParams implements IQueryStatement {
+
+  private static final String GT = ">";
+  private static final String LT = "<";
+
+  private final List<FilterCondition> filterConditions;
+
+  private WhereClauseParams(Long startTime,
+                            Long endTime,
+                            String whereConditions) {
+    this(startTime, endTime);
+    if (whereConditions != null) {
+      buildConditions(whereConditions);
+    }
+  }
+
+  private WhereClauseParams(Long startTime,
+                            Long endTime) {
+    this.filterConditions = new ArrayList<>();
+    this.buildTimeConditions(startTime, endTime);
+  }
+
+  private WhereClauseParams(String whereConditions) {
+    this.filterConditions = new ArrayList<>();
+    if (whereConditions != null) {
+      buildConditions(whereConditions);
+    }
+  }
+
+  public static WhereClauseParams from(Long startTime,
+                                       Long endTime) {
+    return new WhereClauseParams(startTime, endTime);
+  }
+
+  public static WhereClauseParams from(String whereConditions) {
+    return new WhereClauseParams(whereConditions);
+  }
+
+  public static WhereClauseParams from(Long startTime,
+                                       Long endTime,
+                                       String whereConditions) {
+    return new WhereClauseParams(startTime, endTime, whereConditions);
+  }
+
+  private void buildTimeConditions(Long startTime,
+                                   Long endTime) {
+    if (startTime == null) {
+      this.filterConditions.add(buildTimeBoundary(endTime, LT));
+    } else if (endTime == null) {
+      this.filterConditions.add(buildTimeBoundary(startTime, GT));
+    } else {
+      this.filterConditions.add(buildTimeBoundary(endTime, LT));
+      this.filterConditions.add(buildTimeBoundary(startTime, GT));
+    }
+  }
+
+  private FilterCondition buildTimeBoundary(Long time, String operator) {
+    return new FilterCondition("time", operator, time * 1000000);
+  }
+
+  private void buildConditions(String whereConditions) {
+    List<String[]> whereParts = ProvidedRestQueryParamConverter.buildConditions(whereConditions);
+    // Add single quotes to strings except for true and false
+    whereParts.forEach(singleCondition -> {
+
+      this.filterConditions.add(
+          new FilterCondition(singleCondition[0], singleCondition[1], this.returnCondition(singleCondition[2])));
+    });
+  }
+
+  private Object returnCondition(String inputCondition) {
+    if (NumberUtils.isCreatable(inputCondition)) {
+      return Double.parseDouble(inputCondition);
+    } else if (isBoolean(inputCondition)) {
+      return Boolean.parseBoolean(inputCondition);
+    } else {
+      return inputCondition;
+    }
+  }
+
+  private boolean isBoolean(String input) {
+    return "true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input);
+  }
+
+  public List<FilterCondition> getWhereConditions() {
+    return filterConditions;
+  }
+
+  @Override
+  public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+    builder.withInclusiveFilter(filterConditions);
+  }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java
similarity index 68%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java
index a63c98077..1bff44a1d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java
@@ -15,11 +15,14 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4;
-
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
-import org.apache.streampipes.dataexplorer.sdk.DataLakeQueryOrdering;
-import org.apache.streampipes.dataexplorer.v4.params.SelectColumn;
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.model.SelectColumn;
+import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering;
 import org.apache.streampipes.model.datalake.SpQueryResult;
 
 import org.slf4j.Logger;
@@ -32,13 +35,13 @@ import java.util.Date;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_TIME_INTERVAL;
 
 public class AutoAggregationHandler {
 
@@ -50,15 +53,19 @@ public class AutoAggregationHandler {
   private final SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
   private final SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
 
-  private final DataLakeManagementV4 dataLakeManagement;
-  private final ProvidedQueryParams queryParams;
+  private final IDataExplorerQueryManagement dataLakeQueryManagement;
+  private final ProvidedRestQueryParams queryParams;
 
-  public AutoAggregationHandler(ProvidedQueryParams params) {
+  public AutoAggregationHandler(ProvidedRestQueryParams params) {
     this.queryParams = params;
-    this.dataLakeManagement = new DataLakeManagementV4();
+    this.dataLakeQueryManagement = getDataLakeQueryManagement();
+  }
+
+  private IDataExplorerQueryManagement getDataLakeQueryManagement() {
+    return new DataExplorerQueryManagement(new DataExplorerSchemaManagement());
   }
 
-  public ProvidedQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
+  public ProvidedRestQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
     try {
       SpQueryResult newest = getSingleRecord(DataLakeQueryOrdering.DESC);
       SpQueryResult oldest = getSingleRecord(DataLakeQueryOrdering.ASC);
@@ -85,25 +92,25 @@ public class AutoAggregationHandler {
     return null;
   }
 
-  private ProvidedQueryParams disableAutoAgg(ProvidedQueryParams params) {
+  private ProvidedRestQueryParams disableAutoAgg(ProvidedRestQueryParams params) {
     params.remove(QP_AUTO_AGGREGATE);
     return params;
   }
 
   public Integer getCount(String fieldName) {
-    ProvidedQueryParams countParams = disableAutoAgg(new ProvidedQueryParams(queryParams));
+    ProvidedRestQueryParams countParams = disableAutoAgg(new ProvidedRestQueryParams(queryParams));
     countParams.remove(QP_TIME_INTERVAL);
     countParams.remove(QP_AGGREGATION_FUNCTION);
     countParams.update(QP_COUNT_ONLY, true);
     countParams.update(QP_COLUMNS, fieldName);
 
-    SpQueryResult result = new DataLakeManagementV4().getData(countParams, true);
+    SpQueryResult result = dataLakeQueryManagement.getData(countParams, true);
 
     return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0;
   }
 
-  private SpQueryResult fireQuery(ProvidedQueryParams params) {
-    return dataLakeManagement.getData(params, true);
+  private SpQueryResult fireQuery(ProvidedRestQueryParams params) {
+    return dataLakeQueryManagement.getData(params, true);
   }
 
   private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException {
@@ -113,7 +120,7 @@ public class AutoAggregationHandler {
   }
 
   private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws ParseException {
-    ProvidedQueryParams singleEvent = disableAutoAgg(new ProvidedQueryParams(queryParams));
+    ProvidedRestQueryParams singleEvent = disableAutoAgg(new ProvidedRestQueryParams(queryParams));
     singleEvent.remove(QP_AGGREGATION_FUNCTION);
     singleEvent.update(QP_LIMIT, 1);
     singleEvent.update(QP_ORDER, order.name());
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
new file mode 100644
index 000000000..769fc042a
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
+import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.datalake.SpQueryStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class DataExplorerQueryExecutor<X, W> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DataExplorerInfluxQueryExecutor.class);
+  protected int maximumAmountOfEvents;
+
+  protected boolean appendId = false;
+  protected String forId;
+
+  public DataExplorerQueryExecutor() {
+    this.maximumAmountOfEvents = -1;
+  }
+
+  public DataExplorerQueryExecutor(String forId) {
+    this();
+    this.appendId = true;
+    this.forId = forId;
+  }
+
+  public DataExplorerQueryExecutor(int maximumAmountOfEvents) {
+    this();
+    this.maximumAmountOfEvents = maximumAmountOfEvents;
+  }
+
+  public SpQueryResult executeQuery(SelectQueryParams params,
+                                    boolean ignoreMissingValues) throws RuntimeException {
+
+    if (this.maximumAmountOfEvents != -1) {
+      X countQuery = makeCountQuery(params);
+      W countQueryResult = executeQuery(countQuery);
+      Double amountOfQueryResults = getAmountOfResults(countQueryResult);
+
+      if (amountOfQueryResults > this.maximumAmountOfEvents) {
+        SpQueryResult tooMuchData = new SpQueryResult();
+        tooMuchData.setSpQueryStatus(SpQueryStatus.TOO_MUCH_DATA);
+        tooMuchData.setTotal(amountOfQueryResults.intValue());
+        return tooMuchData;
+      }
+    }
+
+    X query = makeSelectQuery(params);
+    return executeQuery(query, ignoreMissingValues);
+  }
+
+  public SpQueryResult executeQuery(DeleteQueryParams params) {
+    return executeQuery(makeDeleteQuery(params), true);
+  }
+
+  public SpQueryResult executeQuery(X query,
+                                    boolean ignoreMissingValues) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Data Lake Query " + asQueryString(query));
+    }
+
+    W result = executeQuery(query);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Data Lake Query Result: " + result.toString());
+    }
+
+    return postQuery(result, ignoreMissingValues);
+  }
+
+  protected abstract double getAmountOfResults(W countQueryResult);
+
+  protected abstract SpQueryResult postQuery(W queryResult,
+                                             boolean ignoreMissingValues);
+
+  protected abstract W executeQuery(X query);
+
+  protected abstract String asQueryString(X query);
+
+  protected abstract X makeDeleteQuery(DeleteQueryParams params);
+
+  protected abstract X makeCountQuery(SelectQueryParams params);
+
+  protected abstract X makeSelectQuery(SelectQueryParams params);
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/QueryResultProvider.java
similarity index 61%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/QueryResultProvider.java
index d6a7b9d18..ce88fd1b5 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/QueryResultProvider.java
@@ -16,26 +16,24 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query;
+package org.apache.streampipes.dataexplorer.query;
 
-import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
 import org.apache.streampipes.model.datalake.SpQueryResult;
 
-import java.util.Map;
-
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS;
 
 public class QueryResultProvider {
 
   public static final String FOR_ID_KEY = "forId";
   protected final boolean ignoreMissingData;
-  protected ProvidedQueryParams queryParams;
+  protected ProvidedRestQueryParams queryParams;
 
-  public QueryResultProvider(ProvidedQueryParams queryParams,
+  public QueryResultProvider(ProvidedRestQueryParams queryParams,
                              boolean ignoreMissingData) {
     this.queryParams = queryParams;
     this.ignoreMissingData = ignoreMissingData;
@@ -45,18 +43,18 @@ public class QueryResultProvider {
     if (queryParams.has(QP_AUTO_AGGREGATE)) {
       queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
     }
-    Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(queryParams);
 
     if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
       int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
-      return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
+      return new DataExplorerInfluxQueryExecutor(maximumAmountOfEvents).executeQuery(qp, ignoreMissingData);
     }
 
     if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
       String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
-      return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
+      return new DataExplorerInfluxQueryExecutor(forWidgetId).executeQuery(qp, ignoreMissingData);
     } else {
-      return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
+      return new DataExplorerInfluxQueryExecutor().executeQuery(qp, ignoreMissingData);
     }
   }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
similarity index 84%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
index 6e1efd3cb..ee356e936 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
@@ -16,13 +16,13 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query;
+package org.apache.streampipes.dataexplorer.query;
 
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.ConfiguredOutputWriter;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
 import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
-import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredOutputWriter;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.SpQueryResult;
 
@@ -31,8 +31,8 @@ import java.io.OutputStream;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
 
 public class StreamedQueryResultProvider extends QueryResultProvider {
 
@@ -41,7 +41,7 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
 
   private final OutputFormat format;
 
-  public StreamedQueryResultProvider(ProvidedQueryParams params,
+  public StreamedQueryResultProvider(ProvidedRestQueryParams params,
                                      OutputFormat format,
                                      boolean ignoreMissingValues) {
     super(params, ignoreMissingValues);
@@ -69,7 +69,7 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
     boolean isFirstDataItem = true;
     configuredWriter.beforeFirstItem(outputStream);
     do {
-      queryParams.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(page));
+      queryParams.update(SupportedRestQueryParams.QP_PAGE, String.valueOf(page));
       dataResult = getData();
 
       if (dataResult.getTotal() > 0) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredCsvOutputWriter.java
similarity index 86%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredCsvOutputWriter.java
index 8d92feb61..03ca75530 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredCsvOutputWriter.java
@@ -16,17 +16,17 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.item.CsvItemWriter;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.StringJoiner;
 
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_CSV_DELIMITER;
 
 public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
 
@@ -38,7 +38,7 @@ public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
   private String delimiter = COMMA;
 
   @Override
-  public void configure(ProvidedQueryParams params,
+  public void configure(ProvidedRestQueryParams params,
                         boolean ignoreMissingValues) {
     if (params.has(QP_CSV_DELIMITER)) {
       delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? COMMA : SEMICOLON;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredJsonOutputWriter.java
similarity index 85%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredJsonOutputWriter.java
index b2070e2c1..0781c7334 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredJsonOutputWriter.java
@@ -16,11 +16,11 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.ItemGenerator;
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.item.ItemGenerator;
+import org.apache.streampipes.dataexplorer.query.writer.item.JsonItemWriter;
 
 import com.google.gson.Gson;
 
@@ -41,7 +41,7 @@ public class ConfiguredJsonOutputWriter extends ConfiguredOutputWriter {
   }
 
   @Override
-  public void configure(ProvidedQueryParams params,
+  public void configure(ProvidedRestQueryParams params,
                         boolean ignoreMissingValues) {
     // do nothing
   }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredOutputWriter.java
similarity index 89%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredOutputWriter.java
index 6ef855c41..ed60687ae 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredOutputWriter.java
@@ -16,9 +16,9 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -27,7 +27,7 @@ import java.util.List;
 public abstract class ConfiguredOutputWriter {
 
   public static ConfiguredOutputWriter getConfiguredWriter(OutputFormat format,
-                                                           ProvidedQueryParams params,
+                                                           ProvidedRestQueryParams params,
                                                            boolean ignoreMissingValues) {
     var writer = format.getWriter();
     writer.configure(params, ignoreMissingValues);
@@ -35,7 +35,7 @@ public abstract class ConfiguredOutputWriter {
     return writer;
   }
 
-  public abstract void configure(ProvidedQueryParams params,
+  public abstract void configure(ProvidedRestQueryParams params,
                                  boolean ignoreMissingValues);
 
   public abstract void beforeFirstItem(OutputStream outputStream) throws IOException;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/OutputFormat.java
similarity index 95%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/OutputFormat.java
index 5a522b403..c855bf1c8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/OutputFormat.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
 import java.util.function.Supplier;
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/CsvItemWriter.java
similarity index 94%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/CsvItemWriter.java
index 5fe06fb52..bd80f2d9a 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/CsvItemWriter.java
@@ -17,7 +17,7 @@
  */
 
 
-package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
 
 public class CsvItemWriter extends ItemGenerator {
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
similarity index 92%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
index 056192518..7b3aa447f 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
@@ -17,9 +17,9 @@
  */
 
 
-package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
 
-import org.apache.streampipes.dataexplorer.v4.utils.TimeParser;
+import org.apache.streampipes.dataexplorer.utils.TimeParser;
 
 import java.util.List;
 import java.util.StringJoiner;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/JsonItemWriter.java
similarity index 95%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/JsonItemWriter.java
index c4b8d4525..fde43721d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/JsonItemWriter.java
@@ -17,7 +17,7 @@
  */
 
 
-package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
 
 import com.google.gson.Gson;
 
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/DataLakeQueryOrdering.java
similarity index 93%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/DataLakeQueryOrdering.java
index e8734e78e..f8825c157 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/DataLakeQueryOrdering.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.sdk;
+package org.apache.streampipes.dataexplorer.querybuilder;
 
 public enum DataLakeQueryOrdering {
   ASC, DESC
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/FilterCondition.java
similarity index 70%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/FilterCondition.java
index b222eb335..396bed21d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/FilterCondition.java
@@ -15,33 +15,20 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.querybuilder;
 
-import java.util.StringJoiner;
-
-public class WhereCondition {
+public class FilterCondition {
 
   private String field;
   private String operator;
-  private String condition;
+  private Object condition;
 
-  public WhereCondition(String field, String operator, String condition) {
+  public FilterCondition(String field, String operator, Object condition) {
     this.field = field;
     this.operator = operator;
     this.condition = condition;
   }
 
-  @Override
-  public String toString() {
-    StringJoiner joiner = new StringJoiner(" ");
-
-    return joiner
-        .add(field)
-        .add(operator)
-        .add(condition)
-        .toString();
-  }
-
   public String getField() {
     return field;
   }
@@ -50,7 +37,8 @@ public class WhereCondition {
     return operator;
   }
 
-  public String getCondition() {
+  public Object getCondition() {
     return condition;
   }
+
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/IDataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/IDataLakeQueryBuilder.java
new file mode 100644
index 000000000..065c18562
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/IDataLakeQueryBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.querybuilder;
+
+import org.apache.streampipes.dataexplorer.param.model.AggregationFunction;
+
+import org.influxdb.querybuilder.clauses.NestedClause;
+
+import java.util.List;
+
+public interface IDataLakeQueryBuilder<T> {
+
+  IDataLakeQueryBuilder<T> withAllColumns();
+
+  IDataLakeQueryBuilder<T> withSimpleColumn(String columnName);
+
+  IDataLakeQueryBuilder<T> withSimpleColumns(List<String> columnNames);
+
+  IDataLakeQueryBuilder<T> withAggregatedColumn(String columnName,
+                                            AggregationFunction aggregationFunction,
+                                            String targetName);
+
+  IDataLakeQueryBuilder<T> withAggregatedColumn(String columnName,
+                                                AggregationFunction aggregationFunction);
+
+  IDataLakeQueryBuilder<T> withStartTime(long startTime);
+
+  IDataLakeQueryBuilder<T> withEndTime(long endTime);
+
+  IDataLakeQueryBuilder<T> withEndTime(long endTime,
+                                   boolean includeEndTime);
+
+  IDataLakeQueryBuilder<T> withTimeBoundary(long startTime,
+                                        long endTime);
+
+  IDataLakeQueryBuilder<T> withFilter(String field,
+                                  String operator,
+                                  Object value);
+
+  IDataLakeQueryBuilder<T> withExclusiveFilter(String field,
+                                           String operator,
+                                           List<?> values);
+
+  IDataLakeQueryBuilder<T> withInclusiveFilter(String field,
+                                            String operator,
+                                            List<?> values);
+
+  IDataLakeQueryBuilder<T> withInclusiveFilter(List<FilterCondition> filterConditions);
+
+  IDataLakeQueryBuilder<T> withFilter(NestedClause clause);
+
+  IDataLakeQueryBuilder<T> withGroupByTime(String timeInterval);
+
+  IDataLakeQueryBuilder<T> withGroupByTime(String timeInterval,
+                                       String offsetInterval);
+
+  IDataLakeQueryBuilder<T> withGroupBy(String column);
+
+  IDataLakeQueryBuilder<T> withOrderBy(DataLakeQueryOrdering ordering);
+
+  IDataLakeQueryBuilder<T> withLimit(int limit);
+
+  IDataLakeQueryBuilder<T> withOffset(int offset);
+
+  IDataLakeQueryBuilder<T> withFill(Object fill);
+
+  T build();
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryConstants.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryConstants.java
deleted file mode 100644
index f6664f45a..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryConstants.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.sdk;
-
-public class DataLakeQueryConstants {
-
-  public static final String GE = ">=";
-  public static final String LE = "<=";
-  public static final String LT = "<";
-  public static final String GT = ">";
-  public static final String EQ = "=";
-  public static final String NEQ = "!=";
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/IDataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/IDataLakeQueryBuilder.java
deleted file mode 100644
index 2bc70df16..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/IDataLakeQueryBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.sdk;
-
-import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction;
-
-import org.influxdb.querybuilder.clauses.NestedClause;
-
-import java.util.List;
-
-public interface IDataLakeQueryBuilder<T> {
-  IDataLakeQueryBuilder withSimpleColumn(String columnName);
-
-  IDataLakeQueryBuilder withSimpleColumns(List<String> columnNames);
-
-  IDataLakeQueryBuilder withAggregatedColumn(String columnName,
-                                            ColumnFunction columnFunction,
-                                            String targetName);
-
-  IDataLakeQueryBuilder withStartTime(long startTime);
-
-  IDataLakeQueryBuilder withEndTime(long endTime);
-
-  IDataLakeQueryBuilder withEndTime(long endTime,
-                                   boolean includeEndTime);
-
-  IDataLakeQueryBuilder withTimeBoundary(long startTime,
-                                        long endTime);
-
-  IDataLakeQueryBuilder withFilter(String field,
-                                  String operator,
-                                  Object value);
-
-  IDataLakeQueryBuilder withExclusiveFilter(String field,
-                                           String operator,
-                                           List<?> values);
-
-  IDataLakeQueryBuilder withInclusiveFilter(String field,
-                                            String operator,
-                                            List<?> values);
-
-  IDataLakeQueryBuilder withFilter(NestedClause clause);
-
-  IDataLakeQueryBuilder withGroupByTime(String timeInterval);
-
-  IDataLakeQueryBuilder withGroupByTime(String timeInterval,
-                                       String offsetInterval);
-
-  IDataLakeQueryBuilder withGroupBy(String column);
-
-  IDataLakeQueryBuilder withOrderBy(DataLakeQueryOrdering ordering);
-
-  IDataLakeQueryBuilder withLimit(int limit);
-
-  IDataLakeQueryBuilder withOffset(int offset);
-
-  T build();
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
similarity index 97%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
index 53412edb8..135a5d89d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.utils;
+package org.apache.streampipes.dataexplorer.utils;
 
 import java.time.Instant;
 import java.time.LocalDate;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
deleted file mode 100644
index c3cc0392c..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.params;
-
-public class DeleteFromStatementParams extends QueryParamsV4 {
-
-  public DeleteFromStatementParams(String measurementID) {
-    super(measurementID);
-  }
-
-  public static DeleteFromStatementParams from(String measurementID) {
-    return new DeleteFromStatementParams(measurementID);
-  }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
deleted file mode 100644
index 0831e6d5b..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.params;
-
-public class FillParams extends QueryParamsV4 {
-  String fill = "fill(none)";
-
-  protected FillParams(String index) {
-    super(index);
-  }
-
-  public static FillParams from(String measurementID) {
-    return new FillParams(measurementID);
-  }
-
-  public String getFill() {
-    return fill;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
deleted file mode 100644
index 0c7f1829c..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.v4.params;
-
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
-
-import org.apache.commons.lang3.math.NumberUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class WhereStatementParams extends QueryParamsV4 {
-
-  private static final String GT = ">";
-  private static final String LT = "<";
-
-  private List<WhereCondition> whereConditions;
-
-  private WhereStatementParams(String index,
-                               Long startTime,
-                               Long endTime,
-                               String whereConditions) {
-    this(index, startTime, endTime);
-    if (whereConditions != null) {
-      buildConditions(whereConditions);
-    }
-  }
-
-  private WhereStatementParams(String index,
-                               Long startTime,
-                               Long endTime) {
-    super(index);
-    this.whereConditions = new ArrayList<>();
-    this.buildTimeConditions(startTime, endTime);
-  }
-
-  private WhereStatementParams(String index,
-                               String whereConditions) {
-    super(index);
-    this.whereConditions = new ArrayList<>();
-    if (whereConditions != null) {
-      buildConditions(whereConditions);
-    }
-  }
-
-  public static WhereStatementParams from(String measurementId,
-                                          Long startTime,
-                                          Long endTime) {
-    return new WhereStatementParams(measurementId, startTime, endTime);
-  }
-
-  public static WhereStatementParams from(String measurementId,
-                                          String whereConditions) {
-    return new WhereStatementParams(measurementId, whereConditions);
-  }
-
-  public static WhereStatementParams from(String measurementId,
-                                          Long startTime,
-                                          Long endTime,
-                                          String whereConditions) {
-    return new WhereStatementParams(measurementId, startTime, endTime, whereConditions);
-  }
-
-  private void buildTimeConditions(Long startTime,
-                                   Long endTime) {
-    if (startTime == null) {
-      this.whereConditions.add(buildTimeBoundary(endTime, LT));
-    } else if (endTime == null) {
-      this.whereConditions.add(buildTimeBoundary(startTime, GT));
-    } else {
-      this.whereConditions.add(buildTimeBoundary(endTime, LT));
-      this.whereConditions.add(buildTimeBoundary(startTime, GT));
-    }
-  }
-
-  private WhereCondition buildTimeBoundary(Long time, String operator) {
-    return new WhereCondition("time", operator, String.valueOf(time * 1000000));
-  }
-
-  private void buildConditions(String whereConditions) {
-    List<String[]> whereParts = DataLakeManagementUtils.buildConditions(whereConditions);
-    // Add single quotes to strings except for true and false
-    whereParts.forEach(singleCondition -> {
-
-      this.whereConditions.add(
-          new WhereCondition(singleCondition[0], singleCondition[1], this.returnCondition(singleCondition[2])));
-    });
-  }
-
-  private String returnCondition(String inputCondition) {
-    if (NumberUtils.isCreatable(inputCondition) || isBoolean(inputCondition)) {
-      return inputCondition;
-    } else if (inputCondition.equals("\"\"")) {
-      return inputCondition;
-    } else {
-      return "'" + inputCondition + "'";
-    }
-  }
-
-  private boolean isBoolean(String input) {
-    return "true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input);
-  }
-
-  public List<WhereCondition> getWhereConditions() {
-    return whereConditions;
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
deleted file mode 100644
index 22add8267..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query;
-
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.FillParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
-import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
-import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
-import org.apache.streampipes.dataexplorer.v4.query.elements.DeleteFromStatement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.FillStatement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.GroupingByTags;
-import org.apache.streampipes.dataexplorer.v4.query.elements.GroupingByTime;
-import org.apache.streampipes.dataexplorer.v4.query.elements.ItemLimitation;
-import org.apache.streampipes.dataexplorer.v4.query.elements.Offset;
-import org.apache.streampipes.dataexplorer.v4.query.elements.OrderingByTime;
-import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.SelectFromStatement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.WhereStatement;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
-import org.apache.streampipes.model.datalake.DataSeries;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.model.datalake.SpQueryStatus;
-
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class DataExplorerQueryV4 {
-
-  private static final Logger LOG = LoggerFactory.getLogger(DataExplorerQueryV4.class);
-
-  protected Map<String, QueryParamsV4> params;
-
-  protected int maximumAmountOfEvents;
-
-  private boolean appendId = false;
-  private String forId;
-
-  private Environment env;
-
-  public DataExplorerQueryV4() {
-
-  }
-
-  public DataExplorerQueryV4(Map<String, QueryParamsV4> params,
-                             String forId) {
-    this(params);
-    this.appendId = true;
-    this.forId = forId;
-  }
-
-  public DataExplorerQueryV4(Map<String, QueryParamsV4> params) {
-    this.params = params;
-    this.env = Environments.getEnvironment();
-    this.maximumAmountOfEvents = -1;
-  }
-
-  public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountOfEvents) {
-    this(params);
-    this.maximumAmountOfEvents = maximumAmountOfEvents;
-  }
-
-  public SpQueryResult executeQuery(boolean ignoreMissingValues) throws RuntimeException {
-    try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
-      List<QueryElement<?>> queryElements = getQueryElements();
-
-      if (this.maximumAmountOfEvents != -1) {
-        QueryBuilder countQueryBuilder = QueryBuilder.create(getDatabaseName());
-        Query countQuery = countQueryBuilder.build(queryElements, true);
-        QueryResult countQueryResult = influxDB.query(countQuery);
-        Double amountOfQueryResults = getAmountOfResults(countQueryResult);
-
-        if (amountOfQueryResults > this.maximumAmountOfEvents) {
-          SpQueryResult tooMuchData = new SpQueryResult();
-          tooMuchData.setSpQueryStatus(SpQueryStatus.TOO_MUCH_DATA);
-          tooMuchData.setTotal(amountOfQueryResults.intValue());
-          return tooMuchData;
-        }
-      }
-
-      QueryBuilder queryBuilder = QueryBuilder.create(getDatabaseName());
-      Query query = queryBuilder.build(queryElements, false);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand());
-      }
-
-      QueryResult result = influxDB.query(query);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Data Lake Query Result: " + result.toString());
-      }
-
-      return postQuery(result, ignoreMissingValues);
-    }
-  }
-
-  private double getAmountOfResults(QueryResult countQueryResult) {
-    if (countQueryResult.getResults().get(0).getSeries() != null
-        && countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
-      return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
-    } else {
-      return 0.0;
-    }
-  }
-
-
-  protected DataSeries convertResult(QueryResult.Series series,
-                                     boolean ignoreMissingValues) {
-    List<String> columns = series.getColumns();
-    List<List<Object>> values = series.getValues();
-
-    List<List<Object>> resultingValues = new ArrayList<>();
-
-    values.forEach(v -> {
-      if (ignoreMissingValues) {
-        if (!v.contains(null)) {
-          resultingValues.add(v);
-        }
-      } else {
-        resultingValues.add(v);
-      }
-
-    });
-
-    return new DataSeries(values.size(), resultingValues, columns, series.getTags());
-  }
-
-  protected SpQueryResult postQuery(QueryResult queryResult,
-                                    boolean ignoreMissingValues) throws RuntimeException {
-    SpQueryResult result = new SpQueryResult();
-
-    if (hasResult(queryResult)) {
-      result.setTotal(queryResult.getResults().get(0).getSeries().size());
-      queryResult.getResults().get(0).getSeries().forEach(rs -> {
-        DataSeries series = convertResult(rs, ignoreMissingValues);
-        result.setHeaders(series.getHeaders());
-        result.addDataResult(series);
-      });
-    }
-
-    if (this.appendId) {
-      result.setForId(this.forId);
-    }
-
-    return result;
-  }
-
-  private boolean hasResult(QueryResult queryResult) {
-    return queryResult.getResults() != null
-        && queryResult.getResults().size() > 0
-        && queryResult.getResults().get(0).getSeries() != null;
-  }
-
-  protected List<QueryElement<?>> getQueryElements() {
-    List<QueryElement<?>> queryElements = new ArrayList<>();
-
-    if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
-      queryElements.add(
-          new SelectFromStatement((SelectFromStatementParams) this.params.get(DataLakeManagementUtils.SELECT_FROM)));
-    } else {
-      queryElements.add(
-          new DeleteFromStatement((DeleteFromStatementParams) this.params.get(DataLakeManagementUtils.DELETE_FROM)));
-    }
-
-    if (this.params.containsKey(DataLakeManagementUtils.WHERE)) {
-      queryElements.add(new WhereStatement((WhereStatementParams) this.params.get(DataLakeManagementUtils.WHERE)));
-    }
-
-    if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TIME)) {
-      queryElements.add(
-          new GroupingByTime((GroupingByTimeParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TIME)));
-
-    } else if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TAGS)) {
-      queryElements.add(
-          new GroupingByTags((GroupingByTagsParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TAGS)));
-    }
-
-    if (this.params.containsKey(DataLakeManagementUtils.FILL)) {
-      queryElements.add(new FillStatement((FillParams) this.params.get(DataLakeManagementUtils.FILL)));
-    }
-
-    if (this.params.containsKey(DataLakeManagementUtils.ORDER_DESCENDING)) {
-      queryElements.add(
-          new OrderingByTime((OrderingByTimeParams) this.params.get(DataLakeManagementUtils.ORDER_DESCENDING)));
-    } else if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
-      queryElements.add(new OrderingByTime(
-          OrderingByTimeParams.from(this.params.get(DataLakeManagementUtils.SELECT_FROM).getIndex(), "ASC")));
-    }
-
-    if (this.params.containsKey(DataLakeManagementUtils.LIMIT)) {
-      queryElements.add(new ItemLimitation((ItemLimitationParams) this.params.get(DataLakeManagementUtils.LIMIT)));
-    }
-
-    if (this.params.containsKey(DataLakeManagementUtils.OFFSET)) {
-      queryElements.add(new Offset((OffsetParams) this.params.get(DataLakeManagementUtils.OFFSET)));
-    }
-
-    return queryElements;
-  }
-
-  private String getDatabaseName() {
-    return env.getTsStorageBucket().getValueOrDefault();
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
deleted file mode 100644
index a4a09d70e..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query;
-
-import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
-
-import org.influxdb.dto.Query;
-
-import java.util.List;
-import java.util.StringJoiner;
-
-public class QueryBuilder {
-
-  private final StringJoiner queryParts;
-  private final String databaseName;
-
-  private QueryBuilder(String databaseName) {
-    this.queryParts = new StringJoiner(" ");
-    this.databaseName = databaseName;
-  }
-
-  public static QueryBuilder create(String databaseName) {
-    return new QueryBuilder(databaseName);
-  }
-
-  public Query build(List<QueryElement<?>> queryElements, Boolean onlyCountResults) {
-    for (QueryElement<?> queryPart : queryElements) {
-      this.queryParts.add(queryPart.getStatement());
-    }
-    if (onlyCountResults) {
-      return toCountResultsQuery();
-    } else {
-      return toQuery();
-    }
-  }
-
-  public Query toQuery() {
-    return new Query(this.queryParts.toString(), this.databaseName);
-  }
-
-  public Query toCountResultsQuery() {
-    String q = "SELECT COUNT(*) FROM (" + this.queryParts.toString() + ")";
-    return new Query(q, this.databaseName);
-  }
-}
-
-
-
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
deleted file mode 100644
index fc46398e8..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class DeleteFromStatement extends QueryElement<DeleteFromStatementParams> {
-  public DeleteFromStatement(DeleteFromStatementParams deleteFromStatementParams) {
-    super(deleteFromStatementParams);
-  }
-
-  @Override
-  protected String buildStatement(DeleteFromStatementParams deleteFromStatementParams) {
-    return QueryTemplatesV4.deleteFrom(deleteFromStatementParams.getIndex());
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
deleted file mode 100644
index 537119136..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class GroupingByTags extends QueryElement<GroupingByTagsParams> {
-
-  public GroupingByTags(GroupingByTagsParams groupingByTagsParams) {
-    super(groupingByTagsParams);
-  }
-
-  @Override
-  protected String buildStatement(GroupingByTagsParams groupingByTagsParams) {
-    String tags = "";
-    for (String tag : groupingByTagsParams.getGroupingTags()) {
-      if (tags.equals("")) {
-        tags = tag;
-      } else {
-        tags = tags + ", " + tag;
-      }
-    }
-
-    return QueryTemplatesV4.groupByTags(tags);
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
deleted file mode 100644
index 3ceadb161..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class GroupingByTime extends QueryElement<GroupingByTimeParams> {
-
-  public GroupingByTime(GroupingByTimeParams groupingByTimeParams) {
-    super(groupingByTimeParams);
-  }
-
-  @Override
-  protected String buildStatement(GroupingByTimeParams groupingByTimeParams) {
-    return QueryTemplatesV4.groupByTime(groupingByTimeParams.getTimeInterval());
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
deleted file mode 100644
index 3158552b7..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class ItemLimitation extends QueryElement<ItemLimitationParams> {
-
-  public ItemLimitation(ItemLimitationParams itemLimitationParams) {
-    super(itemLimitationParams);
-  }
-
-  @Override
-  protected String buildStatement(ItemLimitationParams itemLimitationParams) {
-    return QueryTemplatesV4.limitItems(itemLimitationParams.getLimit());
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
deleted file mode 100644
index f931f2f72..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class Offset extends QueryElement<OffsetParams> {
-
-  public Offset(OffsetParams offsetParams) {
-    super(offsetParams);
-  }
-
-  @Override
-  protected String buildStatement(OffsetParams offsetParams) {
-    return QueryTemplatesV4.offset(offsetParams.getOffset());
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
deleted file mode 100644
index 58b55645b..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class OrderingByTime extends QueryElement<OrderingByTimeParams> {
-
-  public OrderingByTime(OrderingByTimeParams orderingByTimeParams) {
-    super(orderingByTimeParams);
-  }
-
-  @Override
-  protected String buildStatement(OrderingByTimeParams orderingByTimeParams) {
-    return QueryTemplatesV4.orderByTime(orderingByTimeParams.getOrdering());
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
deleted file mode 100644
index 09e93ffbb..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
-
-import java.util.StringJoiner;
-
-public class SelectFromStatement extends QueryElement<SelectFromStatementParams> {
-
-  public SelectFromStatement(SelectFromStatementParams selectFromStatementParams) {
-    super(selectFromStatementParams);
-  }
-
-  @Override
-  protected String buildStatement(SelectFromStatementParams params) {
-    if (params.isSelectWildcard()) {
-      return "SELECT * FROM " + escapeIndex(params.getIndex());
-    } else {
-      StringJoiner joiner = new StringJoiner(",");
-      String queryPrefix = "SELECT ";
-      String queryAppendix = " FROM " + escapeIndex(params.getIndex());
-
-      params.getSelectedColumns().forEach(column -> {
-        joiner.add(column.toQueryString());
-      });
-
-      return queryPrefix + joiner + queryAppendix;
-    }
-  }
-
-  private String escapeIndex(String index) {
-    return "\"" + index + "\"";
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
deleted file mode 100644
index 3bc23c221..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class TimeBoundary extends QueryElement<TimeBoundaryParams> {
-
-  public TimeBoundary(TimeBoundaryParams timeBoundaryParams) {
-    super(timeBoundaryParams);
-  }
-
-  @Override
-  protected String buildStatement(TimeBoundaryParams timeBoundaryParams) {
-    if (timeBoundaryParams.getStartDate() == null) {
-      return QueryTemplatesV4.whereTimeRightBound(timeBoundaryParams.getEndDate());
-    } else if (timeBoundaryParams.getEndDate() == null) {
-      return QueryTemplatesV4.whereTimeLeftBound(timeBoundaryParams.getStartDate());
-    } else {
-      return QueryTemplatesV4.whereTimeWithin(timeBoundaryParams.getStartDate(), timeBoundaryParams.getEndDate());
-    }
-  }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/WhereStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/WhereStatement.java
deleted file mode 100644
index 88719d4b3..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/WhereStatement.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
-
-import java.util.StringJoiner;
-
-public class WhereStatement extends QueryElement<WhereStatementParams> {
-
-  public WhereStatement(WhereStatementParams params) {
-    super(params);
-  }
-
-  @Override
-  protected String buildStatement(WhereStatementParams params) {
-    StringJoiner joiner = new StringJoiner(" AND ");
-
-    params.getWhereConditions().forEach(condition -> joiner.add(condition.toString()));
-
-    return "WHERE " + joiner;
-  }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
deleted file mode 100644
index 5ada24d77..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.v4.template;
-
-public class QueryTemplatesV4 {
-
-  public static String deleteFrom(String index) {
-    return "DELETE FROM \"" + index + "\"";
-  }
-
-  public static String whereTimeWithin(long startDate, long endDate) {
-    return "WHERE time > "
-        + startDate * 1000000
-        + " AND time < "
-        + endDate * 1000000;
-  }
-
-  public static String whereTimeLeftBound(long startDate) {
-    return "WHERE time > "
-        + startDate * 1000000;
-  }
-
-  public static String whereTimeRightBound(long endDate) {
-    return "WHERE time < "
-        + endDate * 1000000;
-  }
-
-  public static String groupByTags(String tags) {
-    return "GROUP BY " + tags;
-  }
-
-  public static String groupByTime(String timeInterval) {
-    return "GROUP BY time(" + timeInterval + ")";
-  }
-
-  public static String orderByTime(String ordering) {
-    return "ORDER BY time " + ordering.toUpperCase();
-  }
-
-  public static String limitItems(int limit) {
-    return "LIMIT " + limit;
-  }
-
-  public static String offset(int offset) {
-    return "OFFSET " + offset;
-  }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
deleted file mode 100644
index 850b14c62..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.utils;
-
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.FillParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
-import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
-import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
-import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_END_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FILTER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_GROUP_BY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_OFFSET;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_START_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
-
-
-public class DataLakeManagementUtils {
-
-  public static final String BRACKET_OPEN = "\\[";
-  public static final String BRACKET_CLOSE = "\\]";
-
-  public static final String SELECT_FROM = "SELECT";
-  public static final String WHERE = "WHERE";
-  public static final String GROUP_BY_TAGS = "GROUPBY";
-  public static final String GROUP_BY_TIME = "GROUPBYTIME";
-  public static final String ORDER_DESCENDING = "DESC";
-  public static final String LIMIT = "LIMIT";
-  public static final String OFFSET = "OFFSET";
-  public static final String FILL = "FILL";
-  public static final String MAXIMUM_AMOUNT_OF_EVENTS = "MAXIMUM_AMOUNT_OF_EVENTS";
-
-  public static final String DELETE_FROM = "DELETE";
-
-  public static Map<String, QueryParamsV4> getSelectQueryParams(ProvidedQueryParams params) {
-    Map<String, QueryParamsV4> queryParts = new HashMap<>();
-    String measurementId = params.getMeasurementId();
-
-    if (params.has(QP_COUNT_ONLY) && params.getAsBoolean(QP_COUNT_ONLY)) {
-      queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS), true));
-    } else {
-      queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS),
-          params.getAsString(QP_AGGREGATION_FUNCTION)));
-    }
-
-    String filterConditions = params.getAsString(QP_FILTER);
-
-    if (hasTimeParams(params)) {
-      queryParts.put(WHERE, WhereStatementParams.from(measurementId,
-          params.getAsLong(QP_START_DATE),
-          params.getAsLong(QP_END_DATE),
-          filterConditions));
-    } else if (filterConditions != null) {
-      queryParts.put(WHERE, WhereStatementParams.from(measurementId, filterConditions));
-    }
-
-    if (params.has(QP_TIME_INTERVAL)) {
-      String timeInterval = params.getAsString(QP_TIME_INTERVAL);
-      if (!params.has(QP_GROUP_BY)) {
-        queryParts.put(GROUP_BY_TIME, GroupingByTimeParams.from(measurementId, timeInterval));
-      } else {
-        params.update(QP_GROUP_BY, params.getAsString(QP_GROUP_BY) + ",time(" + timeInterval + ")");
-      }
-
-      queryParts.put(FILL, FillParams.from(measurementId));
-    }
-
-    if (params.has(QP_GROUP_BY)) {
-      queryParts.put(GROUP_BY_TAGS, GroupingByTagsParams.from(measurementId, params.getAsString(QP_GROUP_BY)));
-    }
-
-
-    if (params.has(QP_ORDER)) {
-      String order = params.getAsString(QP_ORDER);
-      if (order.equals(ORDER_DESCENDING)) {
-        queryParts.put(ORDER_DESCENDING, OrderingByTimeParams.from(measurementId, order));
-      }
-    }
-
-    if (params.has(QP_LIMIT)) {
-      queryParts.put(LIMIT, ItemLimitationParams.from(measurementId, params.getAsInt(QP_LIMIT)));
-    }
-
-    if (params.has(QP_OFFSET)) {
-      queryParts.put(OFFSET, OffsetParams.from(measurementId, params.getAsInt(QP_OFFSET)));
-    } else if (params.has(QP_LIMIT) && params.has(QP_PAGE)) {
-      queryParts.put(OFFSET, OffsetParams.from(measurementId,
-          params.getAsInt(QP_PAGE) * params.getAsInt(QP_LIMIT)));
-    }
-
-    return queryParts;
-  }
-
-  public static Map<String, QueryParamsV4> getDeleteQueryParams(String measurementID,
-                                                                Long startDate,
-                                                                Long endDate) {
-    Map<String, QueryParamsV4> queryParts = new HashMap<>();
-    queryParts.put(DELETE_FROM, DeleteFromStatementParams.from(measurementID));
-    if (startDate != null || endDate != null) {
-      queryParts.put(WHERE, TimeBoundaryParams.from(measurementID, startDate, endDate));
-    }
-    return queryParts;
-  }
-
-  private static boolean hasTimeParams(ProvidedQueryParams params) {
-    return params.has(QP_START_DATE)
-        || params.has(QP_END_DATE);
-  }
-
-  public static List<String[]> buildConditions(String queryPart) {
-    String[] conditions = queryPart.split(",");
-    List<String[]> result = new ArrayList<>();
-
-    Arrays.stream(conditions).forEach(condition -> {
-      String[] singleCondition = buildSingleCondition(condition);
-      result.add(singleCondition);
-    });
-    return result;
-  }
-
-  public static String[] buildSingleCondition(String queryPart) {
-    return queryPart
-        .replaceAll(BRACKET_OPEN, "")
-        .replaceAll(BRACKET_CLOSE, "")
-        .split(";");
-  }
-}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/SelectQueryParamsTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/SelectQueryParamsTest.java
new file mode 100644
index 000000000..0ad559770
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/SelectQueryParamsTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.influx.DataLakeInfluxQueryBuilder;
+import org.apache.streampipes.dataexplorer.utils.ProvidedQueryParameterBuilder;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class SelectQueryParamsTest {
+
+  @Test
+  public void testWildcardTimeBoundQuery() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT * FROM \"abc\" WHERE (time < 2000000 AND time > 1000000);", query);
+  }
+
+  @Test
+  public void testSimpleColumnQuery() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withSimpleColumns(Arrays.asList("p1", "p2"))
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000);", query);
+  }
+
+  @Test
+  public void testSimpleColumnQueryWithBooleanFilter() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withSimpleColumns(Arrays.asList("p1", "p2"))
+        .withFilter("[p1;=;true]")
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 = true);", query);
+  }
+
+  @Test
+  public void testSimpleColumnQueryWithStringFilter() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withSimpleColumns(Arrays.asList("p1", "p2"))
+        .withFilter("[p1;=;def]")
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 = 'def');", query);
+  }
+
+  @Test
+  public void testSimpleColumnQueryWithIntFilter() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withSimpleColumns(Arrays.asList("p1", "p2"))
+        .withFilter("[p1;=;1]")
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 = 1.0);", query);
+  }
+
+  @Test
+  public void testSimpleColumnQueryWithFloatFilter() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withSimpleColumns(Arrays.asList("p1", "p2"))
+        .withFilter("[p1;>;1.0]")
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 > 1.0);", query);
+  }
+
+  @Test
+  public void testSimpleColumnQueryWithTwoFilters() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withSimpleColumns(Arrays.asList("p1", "p2"))
+        .withFilter("[p1;>;1.0],[p2;<;2]")
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 > 1.0 AND"
+        + " p2 < 2.0);", query);
+  }
+
+  @Test
+  public void testAggregatedColumn() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withQueryColumns(List.of("[p1;MEAN;p1_mean]"))
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT MEAN(p1) AS p1_mean FROM \"abc\" WHERE (time < 2000000 AND time > 1000000);", query);
+  }
+
+  @Test
+  public void testAggregatedColumns() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withQueryColumns(Arrays.asList("[p1;MEAN;p1_mean]", "[p2;COUNT;p2_count]"))
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT MEAN(p1) AS p1_mean,COUNT(p2) AS p2_count FROM \"abc\" WHERE (time < 2000000 AND"
+        + " time > 1000000);", query);
+  }
+
+  @Test
+  public void testGroupByTag() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withQueryColumns(Arrays.asList("[p1;MEAN;p1_mean]", "[p2;COUNT;p2_count]"))
+        .withGroupBy(List.of("sensorId"))
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT MEAN(p1) AS p1_mean,COUNT(p2) AS p2_count FROM \"abc\" WHERE (time < 2000000 AND"
+        + " time > 1000000) GROUP BY sensorId;", query);
+  }
+
+  @Test
+  public void testGroupByTags() {
+    var params = ProvidedQueryParameterBuilder.create("abc")
+        .withStartDate(1)
+        .withEndDate(2)
+        .withQueryColumns(Arrays.asList("[p1;MEAN;p1_mean]", "[p2;COUNT;p2_count]"))
+        .withGroupBy(Arrays.asList("sensorId", "sensorId2"))
+        .build();
+
+    SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+    String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+    assertEquals("SELECT MEAN(p1) AS p1_mean,COUNT(p2) AS p2_count FROM \"abc\" WHERE (time < 2000000 AND"
+        + " time > 1000000) GROUP BY sensorId,sensorId2;", query);
+  }
+
+}
diff --git a/streampipes-rest/src/test/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParamsTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/WhereStatementParamsTest.java
similarity index 53%
rename from streampipes-rest/src/test/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParamsTest.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/WhereStatementParamsTest.java
index 694af5369..e24d90b87 100644
--- a/streampipes-rest/src/test/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParamsTest.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/WhereStatementParamsTest.java
@@ -16,7 +16,10 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.param.model.WhereClauseParams;
+import org.apache.streampipes.dataexplorer.querybuilder.FilterCondition;
 
 import org.junit.Test;
 
@@ -25,34 +28,34 @@ import static org.junit.Assert.assertEquals;
 public class WhereStatementParamsTest {
   @Test
   public void filterNumber() {
-    WhereStatementParams result = WhereStatementParams.from("", "[fieldName;=;6]");
-    WhereCondition expected = new WhereCondition("fieldName", "=", "6");
+    WhereClauseParams result = WhereClauseParams.from("[fieldName;=;6]");
+    FilterCondition expected = new FilterCondition("fieldName", "=", 6.0);
 
     assertWhereCondition(result, expected);
   }
 
   @Test
   public void filterBoolean() {
-    WhereStatementParams result = WhereStatementParams.from("", "[fieldName;=;true]");
-    WhereCondition expected = new WhereCondition("fieldName", "=", "true");
+    WhereClauseParams result = WhereClauseParams.from("[fieldName;=;true]");
+    FilterCondition expected = new FilterCondition("fieldName", "=", true);
 
     assertWhereCondition(result, expected);
   }
 
   @Test
   public void filterString() {
-    WhereStatementParams result = WhereStatementParams.from("", "[fieldName;=;a]");
-    WhereCondition expected = new WhereCondition("fieldName", "=", "'a'");
+    WhereClauseParams result = WhereClauseParams.from("[fieldName;=;a]");
+    FilterCondition expected = new FilterCondition("fieldName", "=", "a");
 
     assertWhereCondition(result, expected);
   }
 
-  private void assertWhereCondition(WhereStatementParams result, WhereCondition expected) {
+  private void assertWhereCondition(WhereClauseParams result, FilterCondition expected) {
     assertEquals(1, result.getWhereConditions().size());
-    WhereCondition resultingWhereCondition = result.getWhereConditions().get(0);
-    assertEquals(expected.getField(), resultingWhereCondition.getField());
-    assertEquals(expected.getOperator(), resultingWhereCondition.getOperator());
-    assertEquals(expected.getCondition(), resultingWhereCondition.getCondition());
+    FilterCondition resultingFilterCondition = result.getWhereConditions().get(0);
+    assertEquals(expected.getField(), resultingFilterCondition.getField());
+    assertEquals(expected.getOperator(), resultingFilterCondition.getOperator());
+    assertEquals(expected.getCondition(), resultingFilterCondition.getCondition());
   }
 
 
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredCsvOutputWriter.java
similarity index 85%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredCsvOutputWriter.java
index c4679ffdf..5b64b4421 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredCsvOutputWriter.java
@@ -16,10 +16,9 @@
  *
  */
 
-package org.apache.streampipesdataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredCsvOutputWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
 
 import com.google.common.base.Charsets;
 import org.junit.Test;
@@ -37,7 +36,7 @@ public class TestConfiguredCsvOutputWriter extends TestConfiguredOutputWriter {
   @Test
   public void testCsvOutputWriter() throws IOException {
     var writer = new ConfiguredCsvOutputWriter();
-    writer.configure(new ProvidedQueryParams(null, new HashMap<>()), true);
+    writer.configure(new ProvidedRestQueryParams(null, new HashMap<>()), true);
 
     try (var outputStream = new ByteArrayOutputStream()) {
       writer.beforeFirstItem(outputStream);
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredJsonOutputWriter.java
similarity index 85%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredJsonOutputWriter.java
index 9072c6b49..2e0431cf7 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredJsonOutputWriter.java
@@ -16,10 +16,9 @@
  *
  */
 
-package org.apache.streampipesdataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredJsonOutputWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
 
 import com.google.common.base.Charsets;
 import org.junit.Test;
@@ -38,7 +37,7 @@ public class TestConfiguredJsonOutputWriter extends TestConfiguredOutputWriter {
   @Test
   public void testJsonOutputWriter() throws IOException {
     var writer = new ConfiguredJsonOutputWriter();
-    writer.configure(new ProvidedQueryParams(null, new HashMap<>()), true);
+    writer.configure(new ProvidedRestQueryParams(null, new HashMap<>()), true);
 
     try (var outputStream = new ByteArrayOutputStream()) {
       writer.beforeFirstItem(outputStream);
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredOutputWriter.java
similarity index 95%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredOutputWriter.java
index 78927a1c9..8a681f9a8 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredOutputWriter.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipesdataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
 
 import org.junit.Before;
 
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestCsvItemWriter.java
similarity index 91%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestCsvItemWriter.java
index 37a25a4d2..c05937d28 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestCsvItemWriter.java
@@ -16,9 +16,7 @@
  *
  */
 
-package org.apache.streampipesdataexplorer.v4.query.writer.item;
-
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+package org.apache.streampipes.dataexplorer.query.writer.item;
 
 import org.junit.Test;
 
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestItemWriter.java
similarity index 94%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestItemWriter.java
index 26092fd88..24b4d38bf 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestItemWriter.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipesdataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
 
 import org.junit.Before;
 
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestJsonItemWriter.java
similarity index 89%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestJsonItemWriter.java
index c00962835..833062e08 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestJsonItemWriter.java
@@ -16,9 +16,7 @@
  *
  */
 
-package org.apache.streampipesdataexplorer.v4.query.writer.item;
-
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+package org.apache.streampipes.dataexplorer.query.writer.item;
 
 import com.google.gson.Gson;
 import org.junit.Test;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java
index 82a999463..edce7e7f7 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.streampipes.dataexplorer.sdk;
 
+import org.apache.streampipes.dataexplorer.influx.DataLakeInfluxQueryBuilder;
+
 import org.junit.Test;
 
 import java.util.List;
@@ -29,7 +31,7 @@ public class DataLakeQueryBuilderTest {
   private static final String MEASUREMENT = "measurement";
   @Test
   public void withSimpleColumnsTest() {
-    var result = DataLakeQueryBuilder
+    var result = DataLakeInfluxQueryBuilder
         .create(MEASUREMENT)
         .withSimpleColumns(List.of("one", "two"))
         .build();
@@ -37,4 +39,4 @@ public class DataLakeQueryBuilderTest {
     var expected = String.format("SELECT one,two FROM \"%s\";", MEASUREMENT);
     assertEquals(expected , result.getCommand());
   }
-}
\ No newline at end of file
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/utils/ProvidedQueryParameterBuilder.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/utils/ProvidedQueryParameterBuilder.java
new file mode 100644
index 000000000..c4cd73ceb
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/utils/ProvidedQueryParameterBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.utils;
+
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ProvidedQueryParameterBuilder {
+
+  private final String measurementId;
+  private final Map<String, String> queryParams = new HashMap<>();
+
+  public static ProvidedQueryParameterBuilder create(String measurementId) {
+    return new ProvidedQueryParameterBuilder(measurementId);
+  }
+
+  public ProvidedQueryParameterBuilder(String measurementId) {
+    this.measurementId = measurementId;
+  }
+
+  public ProvidedQueryParameterBuilder withStartDate(long startDate) {
+    this.queryParams.put(SupportedRestQueryParams.QP_START_DATE, String.valueOf(startDate));
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withEndDate(long endDate) {
+    this.queryParams.put(SupportedRestQueryParams.QP_END_DATE, String.valueOf(endDate));
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withSimpleColumns(List<String> simpleColumns) {
+    this.queryParams.put(SupportedRestQueryParams.QP_COLUMNS, String.join(",", simpleColumns));
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withQueryColumns(List<String> rawQueryColumns) {
+    this.queryParams.put(SupportedRestQueryParams.QP_COLUMNS, String.join(",", rawQueryColumns));
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withGroupBy(List<String> groupBy) {
+    this.queryParams.put(SupportedRestQueryParams.QP_GROUP_BY, String.join(",", groupBy));
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withFilter(String filter) {
+    this.queryParams.put(SupportedRestQueryParams.QP_FILTER, filter);
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withPage(int page) {
+    this.queryParams.put(SupportedRestQueryParams.QP_PAGE, String.valueOf(page));
+
+    return this;
+  }
+
+  public ProvidedQueryParameterBuilder withLimit(int limit) {
+    this.queryParams.put(SupportedRestQueryParams.QP_LIMIT, String.valueOf(limit));
+
+    return this;
+  }
+
+  public ProvidedRestQueryParams build() {
+    return new ProvidedRestQueryParams(measurementId, queryParams);
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index ae3a0c3ea..9b9641366 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -18,8 +18,6 @@
 
 package org.apache.streampipes.sinks.internal.jvm;
 
-import org.apache.streampipes.dataexplorer.commons.configs.CouchDbConfigurations;
-import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
 import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -53,8 +51,6 @@ public class SinksInternalJvmInit extends ExtensionsModelSubmitter {
             new SpKafkaProtocolFactory(),
             new SpJmsProtocolFactory(),
             new SpMqttProtocolFactory())
-        .addConfigs(DataExplorerConfigurations.getDefaults())
-        .addConfigs(CouchDbConfigurations.getDefaults())
         .build();
 
 
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
deleted file mode 100644
index 590d30e27..000000000
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.ps;
-
-import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-
-import jakarta.ws.rs.Consumes;
-import jakarta.ws.rs.POST;
-import jakarta.ws.rs.Path;
-import jakarta.ws.rs.PathParam;
-import jakarta.ws.rs.Produces;
-import jakarta.ws.rs.core.MediaType;
-import jakarta.ws.rs.core.Response;
-
-@Path("/v3/datalake/measure")
-@Deprecated
-public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
-
-  private DataLakeNoUserManagementV3 dataLakeManagement;
-
-  public DataLakeMeasureResourceV3() {
-    this.dataLakeManagement = new DataLakeNoUserManagementV3();
-  }
-
-  @POST
-  @JacksonSerialized
-  @Produces(MediaType.APPLICATION_JSON)
-  @Consumes(MediaType.APPLICATION_JSON)
-  @Path("/{measure}")
-  public Response addDataLake(@PathParam("measure") String measure, EventSchema eventSchema) {
-    if (this.dataLakeManagement.addDataLake(measure, eventSchema)) {
-      return ok();
-    } else {
-      return Response.status(409).build();
-    }
-
-  }
-}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index a59ce31ac..4e57132ff 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.ps;
 
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -37,10 +38,10 @@ import jakarta.ws.rs.core.Response;
 @Path("/v4/datalake/measure")
 public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
 
-  private DataLakeManagementV4 dataLakeManagement;
+  private final IDataExplorerSchemaManagement dataLakeMeasureManagement;
 
   public DataLakeMeasureResourceV4() {
-    this.dataLakeManagement = new DataLakeManagementV4();
+    this.dataLakeMeasureManagement = new DataExplorerSchemaManagement();
   }
 
   @POST
@@ -48,7 +49,7 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.APPLICATION_JSON)
   public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
-    DataLakeMeasure result = this.dataLakeManagement.addDataLake(dataLakeMeasure);
+    DataLakeMeasure result = this.dataLakeMeasureManagement.createMeasurement(dataLakeMeasure);
     return ok(result);
   }
 
@@ -56,8 +57,8 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @JacksonSerialized
   @Produces(MediaType.APPLICATION_JSON)
   @Path("{id}")
-  public Response getDataLakeMeasure(@PathParam("id") String measureId) {
-    return ok(this.dataLakeManagement.getById(measureId));
+  public Response getDataLakeMeasure(@PathParam("id") String elementId) {
+    return ok(this.dataLakeMeasureManagement.getById(elementId));
   }
 
   @PUT
@@ -65,11 +66,11 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.APPLICATION_JSON)
   @Path("{id}")
-  public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+  public Response updateDataLakeMeasure(@PathParam("id") String elementId,
                                         DataLakeMeasure measure) {
-    if (measureId.equals(measure.getElementId())) {
+    if (elementId.equals(measure.getElementId())) {
       try {
-        this.dataLakeManagement.updateDataLake(measure);
+        this.dataLakeMeasureManagement.updateMeasurement(measure);
         return ok();
       } catch (IllegalArgumentException e) {
         return badRequest(e.getMessage());
@@ -81,9 +82,9 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @DELETE
   @JacksonSerialized
   @Path("{id}")
-  public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+  public Response deleteDataLakeMeasure(@PathParam("id") String elementId) {
     try {
-      this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+      this.dataLakeMeasureManagement.deleteMeasurement(elementId);
       return ok();
     } catch (IllegalArgumentException e) {
       return badRequest(e.getMessage());
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 4ed1a0bd7..b449e2dcf 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -18,9 +18,10 @@
 
 package org.apache.streampipes.ps;
 
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
 import org.apache.streampipes.model.StreamPipesErrorMessage;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataSeries;
@@ -57,38 +58,41 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_END_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FILTER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FORMAT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_GROUP_BY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MISSING_VALUE_BEHAVIOUR;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_OFFSET;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_START_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.SUPPORTED_PARAMS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_CSV_DELIMITER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_END_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_FILTER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_FORMAT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_GROUP_BY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_MISSING_VALUE_BEHAVIOUR;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_OFFSET;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_START_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_TIME_INTERVAL;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.SUPPORTED_PARAMS;
 
 @Path("v4/datalake")
 public class DataLakeResourceV4 extends AbstractRestResource {
 
   private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class);
 
-  private DataLakeManagementV4 dataLakeManagement;
+  private DataExplorerQueryManagement dataLakeManagement;
+  private final DataExplorerSchemaManagement dataExplorerSchemaManagement;
 
   public DataLakeResourceV4() {
-    this.dataLakeManagement = new DataLakeManagementV4();
+    this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
+    this.dataLakeManagement = new DataExplorerQueryManagement(dataExplorerSchemaManagement);
   }
 
-  public DataLakeResourceV4(DataLakeManagementV4 dataLakeManagement) {
+  public DataLakeResourceV4(DataExplorerQueryManagement dataLakeManagement) {
     this.dataLakeManagement = dataLakeManagement;
+    this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
   }
 
 
@@ -127,10 +131,10 @@ public class DataLakeResourceV4 extends AbstractRestResource {
       @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true)
       @PathParam("measurementID") String measurementID) {
 
-    boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+    boolean isSuccessDataLake = this.dataLakeManagement.deleteData(measurementID);
 
     if (isSuccessDataLake) {
-      boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+      boolean isSuccessEventProperty = this.dataExplorerSchemaManagement.deleteMeasurementByName(measurementID);
       if (isSuccessEventProperty) {
         return ok();
       } else {
@@ -152,7 +156,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
               description = "array of stored measurement series",
               content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
   public Response getAll() {
-    List<DataLakeMeasure> allMeasurements = this.dataLakeManagement.getAllMeasurements();
+    List<DataLakeMeasure> allMeasurements = this.dataExplorerSchemaManagement.getAllMeasurements();
     return ok(allMeasurements);
   }
 
@@ -231,7 +235,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     if (!(checkProvidedQueryParams(queryParams))) {
       return badRequest();
     } else {
-      ProvidedQueryParams sanitizedParams = populate(measurementID, queryParams);
+      ProvidedRestQueryParams sanitizedParams = populate(measurementID, queryParams);
       try {
         SpQueryResult result =
             this.dataLakeManagement.getData(sanitizedParams, isIgnoreMissingValues(missingValueBehaviour));
@@ -249,7 +253,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
   public Response getData(List<Map<String, String>> queryParams) {
     var results = queryParams
         .stream()
-        .map(qp -> new ProvidedQueryParams(qp.get("measureName"), qp))
+        .map(qp -> new ProvidedRestQueryParams(qp.get("measureName"), qp))
         .map(params -> this.dataLakeManagement.getData(params, true))
         .collect(Collectors.toList());
 
@@ -317,7 +321,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     if (!(checkProvidedQueryParams(queryParams))) {
       return badRequest();
     } else {
-      ProvidedQueryParams sanitizedParams = populate(measurementID, queryParams);
+      ProvidedRestQueryParams sanitizedParams = populate(measurementID, queryParams);
       if (format == null) {
         format = "csv";
       }
@@ -341,7 +345,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
       responses = {
           @ApiResponse(responseCode = "200", description = "All measurement series successfully removed")})
   public Response removeAll() {
-    boolean isSuccess = this.dataLakeManagement.removeAllMeasurements();
+    boolean isSuccess = this.dataLakeManagement.deleteAllData();
     return Response.ok(isSuccess).build();
   }
 
@@ -349,11 +353,11 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     return SUPPORTED_PARAMS.containsAll(providedParams.keySet());
   }
 
-  private ProvidedQueryParams populate(String measurementId, MultivaluedMap<String, String> rawParams) {
+  private ProvidedRestQueryParams populate(String measurementId, MultivaluedMap<String, String> rawParams) {
     Map<String, String> queryParamMap = new HashMap<>();
     rawParams.forEach((key, value) -> queryParamMap.put(key, String.join(",", value)));
 
-    return new ProvidedQueryParams(measurementId, queryParamMap);
+    return new ProvidedRestQueryParams(measurementId, queryParamMap);
   }
 
   // Checks if the parameter for missing value behaviour is set
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index fc7d07e5d..0a816ebcb 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,9 @@
 package org.apache.streampipes.rest;
 
 import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
 import org.apache.streampipes.manager.file.FileManager;
 import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
@@ -93,13 +95,15 @@ public class ResetManagement {
     });
 
     // Remove all data in data lake
-    DataLakeManagementV4 dataLakeManagementV4 = new DataLakeManagementV4();
-    List<DataLakeMeasure> allMeasurements = dataLakeManagementV4.getAllMeasurements();
+    IDataExplorerSchemaManagement dataLakeMeasureManagement = new DataExplorerSchemaManagement();
+    DataExplorerQueryManagement dataExplorerQueryManagement =
+        new DataExplorerQueryManagement(dataLakeMeasureManagement);
+    List<DataLakeMeasure> allMeasurements = dataLakeMeasureManagement.getAllMeasurements();
     allMeasurements.forEach(measurement -> {
-      boolean isSuccessDataLake = dataLakeManagementV4.removeMeasurement(measurement.getMeasureName());
+      boolean isSuccessDataLake = dataExplorerQueryManagement.deleteData(measurement.getMeasureName());
 
       if (isSuccessDataLake) {
-        dataLakeManagementV4.removeEventProperty(measurement.getMeasureName());
+        dataLakeMeasureManagement.deleteMeasurementByName(measurement.getMeasureName());
       }
     });
 
diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
index 1a4f253b7..033458721 100644
--- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
+++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.service.core;
 
 import org.apache.streampipes.ps.DataLakeImageResource;
-import org.apache.streampipes.ps.DataLakeMeasureResourceV3;
 import org.apache.streampipes.ps.DataLakeMeasureResourceV4;
 import org.apache.streampipes.ps.DataLakeResourceV3;
 import org.apache.streampipes.ps.DataLakeResourceV4;
@@ -120,7 +119,6 @@ public class StreamPipesResourceConfig extends BaseResourceConfig {
         DataLakeWidgetResource.class,
         DataLakeImageResource.class,
         DataLakeResourceV3.class,
-        DataLakeMeasureResourceV3.class,
         DataLakeMeasureResourceV4.class,
         DataStream.class,
         EmailConfigurationResource.class,