You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/03/10 20:55:01 UTC

[streampipes] branch 1406-cleanup-data-explorer-query-management updated: Improve data explorer query management (#1406)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 1406-cleanup-data-explorer-query-management
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/1406-cleanup-data-explorer-query-management by this push:
     new f0cb55986 Improve data explorer query management (#1406)
f0cb55986 is described below

commit f0cb55986c70ab74083862824bb07f9c21fefe92
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Mar 10 21:54:46 2023 +0100

    Improve data explorer query management (#1406)
---
 .../commons/configs/CouchDbConfigurations.java     |  35 ------
 .../configs/DataExplorerConfigurations.java        |  46 -------
 .../commons/configs/DataExplorerEnvKeys.java       |  28 -----
 ...entV4.java => DataExplorerQueryManagement.java} | 137 +++------------------
 ...ntV3.java => DataExplorerSchemaManagement.java} |  93 +++++++++++---
 .../api/IDataExplorerQueryManagement.java          |  47 +++++++
 .../api/IDataExplorerSchemaManagement.java         |  23 +++-
 .../dataexplorer/v4/AutoAggregationHandler.java    |  16 ++-
 .../sinks/internal/jvm/SinksInternalJvmInit.java   |   4 -
 .../streampipes/ps/DataLakeMeasureResourceV3.java  |  57 ---------
 .../streampipes/ps/DataLakeMeasureResourceV4.java  |  23 ++--
 .../apache/streampipes/ps/DataLakeResourceV4.java  |  20 +--
 .../apache/streampipes/rest/ResetManagement.java   |  13 +-
 .../service/core/StreamPipesResourceConfig.java    |   2 -
 14 files changed, 201 insertions(+), 343 deletions(-)

diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
deleted file mode 100644
index 45678a9c1..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class CouchDbConfigurations {
-
-  public static List<ConfigItem> getDefaults() {
-    return Arrays.asList(
-        ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
-        ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
-        ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
-    );
-  }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
deleted file mode 100644
index feb7135ea..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class DataExplorerConfigurations {
-  public static final String DATA_LAKE_DATABASE_NAME = "sp";
-
-  public static List<ConfigItem> getDefaults() {
-
-    return Arrays.asList(
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb",
-            "Hostname for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http",
-            "Protocol for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default",
-            "Username for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default",
-            "Password for the StreamPipes data lake database"),
-        ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME,
-            "Database name for the StreamPipes data lake database")
-    );
-  }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
deleted file mode 100644
index 6eb840708..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-public class DataExplorerEnvKeys {
-  public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
-  public static final String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
-  public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
-  public static final String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
-  public static final String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
-  public static final String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
similarity index 51%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
index 7f397af7a..c2fd61edd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
@@ -20,9 +20,10 @@ package org.apache.streampipes.dataexplorer;
 
 import org.apache.streampipes.commons.environment.Environment;
 import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
 import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
 import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
-import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
 import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
 import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
 import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
@@ -32,45 +33,34 @@ import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
 import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.storage.api.IDataLakeStorage;
-import org.apache.streampipes.storage.couchdb.utils.Utils;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import com.google.gson.JsonObject;
+
 import org.influxdb.InfluxDB;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
-import org.lightcouch.CouchDbClient;
 
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
-public class DataLakeManagementV4 {
+public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {
 
-  public List<DataLakeMeasure> getAllMeasurements() {
-    return DataExplorerUtils.getInfos();
-  }
+  private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
 
-  public DataLakeMeasure getById(String measureId) {
-    return getDataLakeStorage().findOne(measureId);
+  public DataExplorerQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement) {
+    this.dataExplorerSchemaManagement = dataExplorerSchemaManagement;
   }
 
+  @Override
   public SpQueryResult getData(ProvidedQueryParams queryParams,
                                boolean ignoreMissingData) throws IllegalArgumentException {
     return new QueryResultProvider(queryParams, ignoreMissingData).getData();
   }
 
+  @Override
   public void getDataAsStream(ProvidedQueryParams params,
                               OutputFormat format,
                               boolean ignoreMissingValues,
@@ -79,7 +69,8 @@ public class DataLakeManagementV4 {
     new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
   }
 
-  public boolean removeAllMeasurements() {
+  @Override
+  public boolean deleteAllData() {
     List<DataLakeMeasure> allMeasurements = getAllMeasurements();
 
     for (DataLakeMeasure measure : allMeasurements) {
@@ -91,7 +82,8 @@ public class DataLakeManagementV4 {
     return true;
   }
 
-  public boolean removeMeasurement(String measurementID) {
+  @Override
+  public boolean deleteData(String measurementID) {
     List<DataLakeMeasure> allMeasurements = getAllMeasurements();
     for (DataLakeMeasure measure : allMeasurements) {
       if (measure.getMeasureName().equals(measurementID)) {
@@ -103,34 +95,14 @@ public class DataLakeManagementV4 {
     return false;
   }
 
+  @Override
   public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
     Map<String, QueryParamsV4> queryParts =
         DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
     return new DataExplorerQueryV4(queryParts).executeQuery(true);
   }
 
-  public boolean removeEventProperty(String measurementID) {
-    boolean isSuccess = false;
-    CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
-    List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
-
-    for (JsonObject document : docs) {
-      if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
-        couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
-            document.get("_rev").toString().replace("\"", ""));
-        isSuccess = true;
-        break;
-      }
-    }
-
-    try {
-      couchDbClient.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-    return isSuccess;
-  }
-
+  @Override
   public Map<String, Object> getTagValues(String measurementId,
                                           String fields) {
     InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
@@ -160,83 +132,8 @@ public class DataLakeManagementV4 {
     return tags;
   }
 
-  public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
-    var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
-    if (existingMeasure != null) {
-      measure.setRev(existingMeasure.getRev());
-      getDataLakeStorage().updateDataLakeMeasure(measure);
-    } else {
-      getDataLakeStorage().storeDataLakeMeasure(measure);
-    }
-  }
-
-  public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
-    if (getDataLakeStorage().findOne(elementId) != null) {
-      getDataLakeStorage().deleteDataLakeMeasure(elementId);
-    } else {
-      throw new IllegalArgumentException("Could not find measure with this ID");
-    }
-  }
-
-  public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
-    List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
-    Optional<DataLakeMeasure> optional =
-        dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
-            .findFirst();
-
-    if (optional.isPresent()) {
-      DataLakeMeasure oldEntry = optional.get();
-      if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
-          measure.getEventSchema().getEventProperties())) {
-        return oldEntry;
-      }
-    } else {
-      measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
-      getDataLakeStorage().storeDataLakeMeasure(measure);
-      return measure;
-    }
-
-    return measure;
-  }
-
-  private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
-    if (prop1.size() != prop2.size()) {
-      return false;
-    }
-
-    return prop1.stream().allMatch(prop -> {
-
-      for (EventProperty property : prop2) {
-        if (prop.getRuntimeName().equals(property.getRuntimeName())) {
-
-          //primitive
-          if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
-            if (((EventPropertyPrimitive) prop)
-                .getRuntimeType()
-                .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
-              return true;
-            }
-
-            //list
-          } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
-            return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
-                Collections.singletonList(((EventPropertyList) property).getEventProperty()));
-
-            //nested
-          } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
-            return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
-                ((EventPropertyNested) property).getEventProperties());
-          }
-        }
-      }
-      return false;
-
-    });
-  }
-
-
-  private IDataLakeStorage getDataLakeStorage() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
+  private List<DataLakeMeasure> getAllMeasurements() {
+    return this.dataExplorerSchemaManagement.getAllMeasurements();
   }
 
   private Environment getEnvironment() {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
similarity index 54%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
index 6fd104ecd..f28cf6cbd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
@@ -18,41 +18,104 @@
 
 package org.apache.streampipes.dataexplorer;
 
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyList;
 import org.apache.streampipes.model.schema.EventPropertyNested;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.storage.api.IDataLakeStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
 import org.apache.streampipes.storage.management.StorageDispatcher;
 
+import com.google.gson.JsonObject;
+import org.lightcouch.CouchDbClient;
+
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
 
+public class DataExplorerSchemaManagement implements IDataExplorerSchemaManagement {
 
-@Deprecated
-public class DataLakeNoUserManagementV3 {
+  @Override
+  public List<DataLakeMeasure> getAllMeasurements() {
+    return DataExplorerUtils.getInfos();
+  }
 
+  @Override
+  public DataLakeMeasure getById(String elementId) {
+    return getDataLakeStorage().findOne(elementId);
+  }
 
-  @Deprecated
-  public boolean addDataLake(String measure, EventSchema eventSchema) {
+  @Override
+  public DataLakeMeasure createMeasurement(DataLakeMeasure measure) {
     List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
     Optional<DataLakeMeasure> optional =
-        dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
+        dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
+            .findFirst();
 
     if (optional.isPresent()) {
-      if (!compareEventProperties(optional.get().getEventSchema().getEventProperties(),
-          eventSchema.getEventProperties())) {
-        return false;
+      DataLakeMeasure oldEntry = optional.get();
+      if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
+          measure.getEventSchema().getEventProperties())) {
+        return oldEntry;
+      }
+    } else {
+      measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+      getDataLakeStorage().storeDataLakeMeasure(measure);
+      return measure;
+    }
+
+    return measure;
+  }
+
+  @Override
+  public void deleteMeasurement(String elementId) {
+    if (getDataLakeStorage().findOne(elementId) != null) {
+      getDataLakeStorage().deleteDataLakeMeasure(elementId);
+    } else {
+      throw new IllegalArgumentException("Could not find measure with this ID");
+    }
+  }
+
+  @Override
+  public boolean deleteMeasurementByName(String measureName) {
+    boolean isSuccess = false;
+    CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+    List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+    for (JsonObject document : docs) {
+      if (document.get("measureName").toString().replace("\"", "").equals(measureName)) {
+        couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
+            document.get("_rev").toString().replace("\"", ""));
+        isSuccess = true;
+        break;
       }
+    }
+
+    try {
+      couchDbClient.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return isSuccess;
+  }
+
+  @Override
+  public void updateMeasurement(DataLakeMeasure measure) {
+    var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+    if (existingMeasure != null) {
+      measure.setRev(existingMeasure.getRev());
+      getDataLakeStorage().updateDataLakeMeasure(measure);
     } else {
-      DataLakeMeasure dataLakeMeasure = new DataLakeMeasure(measure, eventSchema);
-      dataLakeMeasure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
-      getDataLakeStorage().storeDataLakeMeasure(dataLakeMeasure);
+      getDataLakeStorage().storeDataLakeMeasure(measure);
     }
-    return true;
+  }
+
+  private IDataLakeStorage getDataLakeStorage() {
+    return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
   }
 
   private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
@@ -89,8 +152,4 @@ public class DataLakeNoUserManagementV3 {
 
     });
   }
-
-  private IDataLakeStorage getDataLakeStorage() {
-    return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
-  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
new file mode 100644
index 000000000..0b895cefa
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.api;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface IDataExplorerQueryManagement {
+
+  SpQueryResult getData(ProvidedQueryParams queryParams,
+                        boolean ignoreMissingData) throws IllegalArgumentException;
+
+  void getDataAsStream(ProvidedQueryParams params,
+                       OutputFormat format,
+                       boolean ignoreMissingValues,
+                       OutputStream outputStream) throws IOException;
+
+  boolean deleteData(String measurementID);
+
+  SpQueryResult deleteData(String measurementID, Long startDate, Long endDate);
+
+  boolean deleteAllData();
+
+  Map<String, Object> getTagValues(String measurementId,
+                                   String fields);
+}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
similarity index 62%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
index 23867c634..d8dc29648 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
@@ -16,10 +16,23 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.commons.configs;
+package org.apache.streampipes.dataexplorer.api;
 
-public class CouchDbEnvKeys {
-  public static final String COUCHDB_HOST = "SP_COUCHDB_HOST";
-  public static final String COUCHDB_PORT = "SP_COUCHDB_PORT";
-  public static final String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
+import java.util.List;
+
+public interface IDataExplorerSchemaManagement {
+
+  List<DataLakeMeasure> getAllMeasurements();
+
+  DataLakeMeasure getById(String elementId);
+
+  DataLakeMeasure createMeasurement(DataLakeMeasure measure);
+
+  void deleteMeasurement(String elementId);
+
+  boolean deleteMeasurementByName(String measureName);
+
+  void updateMeasurement(DataLakeMeasure measure);
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
index a63c98077..5085820c0 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.streampipes.dataexplorer.v4;
 
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
 import org.apache.streampipes.dataexplorer.sdk.DataLakeQueryOrdering;
 import org.apache.streampipes.dataexplorer.v4.params.SelectColumn;
 import org.apache.streampipes.model.datalake.SpQueryResult;
@@ -50,12 +52,16 @@ public class AutoAggregationHandler {
   private final SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
   private final SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
 
-  private final DataLakeManagementV4 dataLakeManagement;
+  private final IDataExplorerQueryManagement dataLakeQueryManagement;
   private final ProvidedQueryParams queryParams;
 
   public AutoAggregationHandler(ProvidedQueryParams params) {
     this.queryParams = params;
-    this.dataLakeManagement = new DataLakeManagementV4();
+    this.dataLakeQueryManagement = getDataLakeQueryManagement();
+  }
+
+  private IDataExplorerQueryManagement getDataLakeQueryManagement() {
+    return new DataExplorerQueryManagement(new DataExplorerSchemaManagement());
   }
 
   public ProvidedQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
@@ -97,13 +103,13 @@ public class AutoAggregationHandler {
     countParams.update(QP_COUNT_ONLY, true);
     countParams.update(QP_COLUMNS, fieldName);
 
-    SpQueryResult result = new DataLakeManagementV4().getData(countParams, true);
+    SpQueryResult result = dataLakeQueryManagement.getData(countParams, true);
 
     return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0;
   }
 
   private SpQueryResult fireQuery(ProvidedQueryParams params) {
-    return dataLakeManagement.getData(params, true);
+    return dataLakeQueryManagement.getData(params, true);
   }
 
   private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException {
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index ae3a0c3ea..9b9641366 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -18,8 +18,6 @@
 
 package org.apache.streampipes.sinks.internal.jvm;
 
-import org.apache.streampipes.dataexplorer.commons.configs.CouchDbConfigurations;
-import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
 import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -53,8 +51,6 @@ public class SinksInternalJvmInit extends ExtensionsModelSubmitter {
             new SpKafkaProtocolFactory(),
             new SpJmsProtocolFactory(),
             new SpMqttProtocolFactory())
-        .addConfigs(DataExplorerConfigurations.getDefaults())
-        .addConfigs(CouchDbConfigurations.getDefaults())
         .build();
 
 
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
deleted file mode 100644
index 590d30e27..000000000
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.ps;
-
-import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-
-import jakarta.ws.rs.Consumes;
-import jakarta.ws.rs.POST;
-import jakarta.ws.rs.Path;
-import jakarta.ws.rs.PathParam;
-import jakarta.ws.rs.Produces;
-import jakarta.ws.rs.core.MediaType;
-import jakarta.ws.rs.core.Response;
-
-@Path("/v3/datalake/measure")
-@Deprecated
-public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
-
-  private DataLakeNoUserManagementV3 dataLakeManagement;
-
-  public DataLakeMeasureResourceV3() {
-    this.dataLakeManagement = new DataLakeNoUserManagementV3();
-  }
-
-  @POST
-  @JacksonSerialized
-  @Produces(MediaType.APPLICATION_JSON)
-  @Consumes(MediaType.APPLICATION_JSON)
-  @Path("/{measure}")
-  public Response addDataLake(@PathParam("measure") String measure, EventSchema eventSchema) {
-    if (this.dataLakeManagement.addDataLake(measure, eventSchema)) {
-      return ok();
-    } else {
-      return Response.status(409).build();
-    }
-
-  }
-}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index a59ce31ac..4e57132ff 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.ps;
 
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -37,10 +38,10 @@ import jakarta.ws.rs.core.Response;
 @Path("/v4/datalake/measure")
 public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
 
-  private DataLakeManagementV4 dataLakeManagement;
+  private final IDataExplorerSchemaManagement dataLakeMeasureManagement;
 
   public DataLakeMeasureResourceV4() {
-    this.dataLakeManagement = new DataLakeManagementV4();
+    this.dataLakeMeasureManagement = new DataExplorerSchemaManagement();
   }
 
   @POST
@@ -48,7 +49,7 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.APPLICATION_JSON)
   public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
-    DataLakeMeasure result = this.dataLakeManagement.addDataLake(dataLakeMeasure);
+    DataLakeMeasure result = this.dataLakeMeasureManagement.createMeasurement(dataLakeMeasure);
     return ok(result);
   }
 
@@ -56,8 +57,8 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @JacksonSerialized
   @Produces(MediaType.APPLICATION_JSON)
   @Path("{id}")
-  public Response getDataLakeMeasure(@PathParam("id") String measureId) {
-    return ok(this.dataLakeManagement.getById(measureId));
+  public Response getDataLakeMeasure(@PathParam("id") String elementId) {
+    return ok(this.dataLakeMeasureManagement.getById(elementId));
   }
 
   @PUT
@@ -65,11 +66,11 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @Produces(MediaType.APPLICATION_JSON)
   @Consumes(MediaType.APPLICATION_JSON)
   @Path("{id}")
-  public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+  public Response updateDataLakeMeasure(@PathParam("id") String elementId,
                                         DataLakeMeasure measure) {
-    if (measureId.equals(measure.getElementId())) {
+    if (elementId.equals(measure.getElementId())) {
       try {
-        this.dataLakeManagement.updateDataLake(measure);
+        this.dataLakeMeasureManagement.updateMeasurement(measure);
         return ok();
       } catch (IllegalArgumentException e) {
         return badRequest(e.getMessage());
@@ -81,9 +82,9 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
   @DELETE
   @JacksonSerialized
   @Path("{id}")
-  public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+  public Response deleteDataLakeMeasure(@PathParam("id") String elementId) {
     try {
-      this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+      this.dataLakeMeasureManagement.deleteMeasurement(elementId);
       return ok();
     } catch (IllegalArgumentException e) {
       return badRequest(e.getMessage());
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 4ed1a0bd7..ef0971d89 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -18,7 +18,8 @@
 
 package org.apache.streampipes.ps;
 
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
 import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
 import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
 import org.apache.streampipes.model.StreamPipesErrorMessage;
@@ -81,14 +82,17 @@ public class DataLakeResourceV4 extends AbstractRestResource {
 
   private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class);
 
-  private DataLakeManagementV4 dataLakeManagement;
+  private DataExplorerQueryManagement dataLakeManagement;
+  private final DataExplorerSchemaManagement dataExplorerSchemaManagement;
 
   public DataLakeResourceV4() {
-    this.dataLakeManagement = new DataLakeManagementV4();
+    this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
+    this.dataLakeManagement = new DataExplorerQueryManagement(dataExplorerSchemaManagement);
   }
 
-  public DataLakeResourceV4(DataLakeManagementV4 dataLakeManagement) {
+  public DataLakeResourceV4(DataExplorerQueryManagement dataLakeManagement) {
     this.dataLakeManagement = dataLakeManagement;
+    this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
   }
 
 
@@ -127,10 +131,10 @@ public class DataLakeResourceV4 extends AbstractRestResource {
       @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true)
       @PathParam("measurementID") String measurementID) {
 
-    boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+    boolean isSuccessDataLake = this.dataLakeManagement.deleteData(measurementID);
 
     if (isSuccessDataLake) {
-      boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+      boolean isSuccessEventProperty = this.dataExplorerSchemaManagement.deleteMeasurementByName(measurementID);
       if (isSuccessEventProperty) {
         return ok();
       } else {
@@ -152,7 +156,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
               description = "array of stored measurement series",
               content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
   public Response getAll() {
-    List<DataLakeMeasure> allMeasurements = this.dataLakeManagement.getAllMeasurements();
+    List<DataLakeMeasure> allMeasurements = this.dataExplorerSchemaManagement.getAllMeasurements();
     return ok(allMeasurements);
   }
 
@@ -341,7 +345,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
       responses = {
           @ApiResponse(responseCode = "200", description = "All measurement series successfully removed")})
   public Response removeAll() {
-    boolean isSuccess = this.dataLakeManagement.removeAllMeasurements();
+    boolean isSuccess = this.dataLakeManagement.deleteAllData();
     return Response.ok(isSuccess).build();
   }
 
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index fc7d07e5d..bc8d01417 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,9 @@
 package org.apache.streampipes.rest;
 
 import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
 import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
 import org.apache.streampipes.manager.file.FileManager;
 import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
@@ -93,13 +95,14 @@ public class ResetManagement {
     });
 
     // Remove all data in data lake
-    DataLakeManagementV4 dataLakeManagementV4 = new DataLakeManagementV4();
-    List<DataLakeMeasure> allMeasurements = dataLakeManagementV4.getAllMeasurements();
+    IDataExplorerSchemaManagement dataLakeMeasureManagement = new DataExplorerSchemaManagement();
+    DataExplorerQueryManagement dataExplorerQueryManagement = new DataExplorerQueryManagement(dataLakeMeasureManagement);
+    List<DataLakeMeasure> allMeasurements = dataLakeMeasureManagement.getAllMeasurements();
     allMeasurements.forEach(measurement -> {
-      boolean isSuccessDataLake = dataLakeManagementV4.removeMeasurement(measurement.getMeasureName());
+      boolean isSuccessDataLake = dataExplorerQueryManagement.deleteData(measurement.getMeasureName());
 
       if (isSuccessDataLake) {
-        dataLakeManagementV4.removeEventProperty(measurement.getMeasureName());
+        dataLakeMeasureManagement.deleteMeasurementByName(measurement.getMeasureName());
       }
     });
 
diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
index 1a4f253b7..033458721 100644
--- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
+++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.service.core;
 
 import org.apache.streampipes.ps.DataLakeImageResource;
-import org.apache.streampipes.ps.DataLakeMeasureResourceV3;
 import org.apache.streampipes.ps.DataLakeMeasureResourceV4;
 import org.apache.streampipes.ps.DataLakeResourceV3;
 import org.apache.streampipes.ps.DataLakeResourceV4;
@@ -120,7 +119,6 @@ public class StreamPipesResourceConfig extends BaseResourceConfig {
         DataLakeWidgetResource.class,
         DataLakeImageResource.class,
         DataLakeResourceV3.class,
-        DataLakeMeasureResourceV3.class,
         DataLakeMeasureResourceV4.class,
         DataStream.class,
         EmailConfigurationResource.class,