You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/08/01 13:59:24 UTC

[incubator-streampipes] branch STREAMPIPES-563 updated: [STREAMPIPES-563] Move data lake logic into package streampipes-data-explorer-commons

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-563
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-563 by this push:
     new dd37cfaf5 [STREAMPIPES-563] Move data lake logic into package streampipes-data-explorer-commons
dd37cfaf5 is described below

commit dd37cfaf518f065baa4018e343f7b0b30e78dd6c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Aug 1 15:58:43 2022 +0200

    [STREAMPIPES-563] Move data lake logic into package
    streampipes-data-explorer-commons
---
 .../backend/StreamPipesResourceConfig.java         |   8 +-
 streampipes-data-explorer-commons/pom.xml          |   9 +-
 .../dataexplorer/commons/DataExplorerUtils.java    |  55 ++++++--
 .../dataexplorer/commons/DataExplorerWriter.java   |  20 +++
 .../dataexplorer/commons/TimeSeriesStore.java      |  86 ++++++++++++
 .../commons/couchdb/CouchDbConfigurations.java     |  29 ++---
 .../commons/couchdb/CouchDbEnvKeys.java            |   5 +-
 .../dataexplorer/commons/image/ImageStore.java     |  85 ++++++++++++
 .../commons/image/ImageStoreUtils.java             |  34 ++---
 .../{ => influx}/DataExplorerConfigurations.java   |   2 +-
 .../DataExplorerConnectionSettings.java            |   9 +-
 .../commons/{ => influx}/DataExplorerDefaults.java |   2 +-
 .../commons/{ => influx}/DataExplorerEnvKeys.java  |   2 +-
 .../commons}/influx/InfluxDbReservedKeywords.java  |   2 +-
 .../commons/influx/InfluxNameSanitizer.java        |  30 ++---
 .../dataexplorer/commons/influx/InfluxStore.java   |  72 +++++------
 .../dataexplorer/DataLakeManagementV4.java         |  69 +++++++++-
 .../dataexplorer/DataLakeNoUserManagementV3.java   |   3 +
 .../streampipes-sinks-internal-jvm/pom.xml         |   3 +
 .../sinks/internal/jvm/SinksInternalJvmInit.java   |   8 +-
 .../sinks/internal/jvm/datalake/DataLakeSink.java  | 144 ++++++---------------
 .../internal/jvm/datalake/image/ImageStore.java    |  54 --------
 .../model/datalake/DataLakeMeasure.java            |  26 ++++
 .../streampipes/ps}/DataLakeMeasureResourceV3.java |   3 +-
 .../streampipes/ps/DataLakeMeasureResourceV4.java  |  26 ++--
 .../apache/streampipes/ps/DataLakeResourceV3.java  |   1 +
 26 files changed, 484 insertions(+), 303 deletions(-)

diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 0a6efad7b..8972c5ef5 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -19,10 +19,7 @@
 package org.apache.streampipes.backend;
 
 import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
-import org.apache.streampipes.ps.DataLakeImageResource;
-import org.apache.streampipes.ps.DataLakeResourceV3;
-import org.apache.streampipes.ps.DataLakeResourceV4;
-import org.apache.streampipes.ps.PipelineElementTemplateResource;
+import org.apache.streampipes.ps.*;
 import org.apache.streampipes.rest.impl.*;
 import org.apache.streampipes.rest.impl.admin.*;
 import org.apache.streampipes.rest.impl.connect.*;
@@ -30,7 +27,6 @@ import org.apache.streampipes.rest.impl.dashboard.Dashboard;
 import org.apache.streampipes.rest.impl.dashboard.DashboardWidget;
 import org.apache.streampipes.rest.impl.dashboard.VisualizablePipelineResource;
 import org.apache.streampipes.rest.impl.datalake.DataLakeDashboardResource;
-import org.apache.streampipes.rest.impl.datalake.DataLakeMeasureResourceV3;
 import org.apache.streampipes.rest.impl.datalake.DataLakeWidgetResource;
 import org.apache.streampipes.rest.impl.datalake.PersistedDataStreamResource;
 import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
@@ -65,6 +61,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
         register(DataLakeImageResource.class);
         register(DataLakeResourceV3.class);
         register(DataLakeMeasureResourceV3.class);
+        register(DataLakeMeasureResourceV4.class);
         register(DataStream.class);
         register(EmailConfigurationResource.class);
         register(EmailResource.class);
@@ -104,7 +101,6 @@ public class StreamPipesResourceConfig extends ResourceConfig {
         register(DataLakeDashboardResource.class);
         register(DataLakeWidgetResource.class);
         register(DataLakeResourceV3.class);
-        register(DataLakeMeasureResourceV3.class);
         register(PipelineElementFile.class);
         register(DashboardWidget.class);
         register(Dashboard.class);
diff --git a/streampipes-data-explorer-commons/pom.xml b/streampipes-data-explorer-commons/pom.xml
index 2d14cd7e1..5da55d089 100644
--- a/streampipes-data-explorer-commons/pom.xml
+++ b/streampipes-data-explorer-commons/pom.xml
@@ -35,15 +35,16 @@
 
 
         <!-- Others -->
+        <dependency>
+            <groupId>org.lightcouch</groupId>
+            <artifactId>lightcouch</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.influxdb</groupId>
             <artifactId>influxdb-java</artifactId>
         </dependency>
     </dependencies>
 
-    <properties>
-        <maven.compiler.source>11</maven.compiler.source>
-        <maven.compiler.target>11</maven.compiler.target>
-    </properties>
+
 
 </project>
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index b739883be..8749b3cc0 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -19,22 +19,61 @@ package org.apache.streampipes.dataexplorer.commons;
 
 import org.apache.streampipes.client.StreamPipesClient;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxNameSanitizer;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.schema.EventProperty;
+
+import java.util.List;
+import java.util.stream.Collectors;
 
 public class DataExplorerUtils {
     /**
-     * Adds a new measurement to the StreamPipes data lake
+     * Sanitizes the event schema and stores the DataLakeMeasurement to the couchDB
+     *
+     * @param client
      * @param measure
-     * @param eventSchema
      * @throws SpRuntimeException
      */
-    public static void registerAtDataLake(String measure,
-                                          EventSchema eventSchema,
-                                          StreamPipesClient client) throws SpRuntimeException {
+    public static DataLakeMeasure sanitizeAndRegisterAtDataLake(StreamPipesClient client,
+                                                     DataLakeMeasure measure) throws SpRuntimeException {
+        measure = sanitizeDataLakeMeasure(measure);
+        registerAtDataLake(client, measure);
+
+        return measure;
+    }
+
+    private static void registerAtDataLake(StreamPipesClient client,
+                                                     DataLakeMeasure measure) throws SpRuntimeException {
         client
-                .customRequest()
-                .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
+          .customRequest()
+          .sendPost("api/v4/datalake/measure/", measure);
     }
 
 
+    private static DataLakeMeasure sanitizeDataLakeMeasure(DataLakeMeasure measure) throws SpRuntimeException {
+
+        // Removes selected timestamp from event schema
+        measure = removeTimestampsFromEventSchema(measure);
+
+        // Escapes all spaces with _
+        measure.setMeasureName(InfluxNameSanitizer.prepareString(measure.getMeasureName()));
+
+        // Removes all spaces with _ and validates that no special terms are used as runtime names
+        measure.getEventSchema()
+                .getEventProperties()
+                .forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+
+        return measure;
+    }
+
+    private static DataLakeMeasure removeTimestampsFromEventSchema(DataLakeMeasure measure) {
+        List<EventProperty> eventPropertiesWithoutTimestamp = measure.getEventSchema().getEventProperties()
+          .stream()
+          .filter(eventProperty -> !measure.getTimestampField().endsWith(eventProperty.getRuntimeName()))
+          .collect(Collectors.toList());
+        measure.getEventSchema().setEventProperties(eventPropertiesWithoutTimestamp);
+
+        return measure;
+    }
+
 }
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
index 3f3a1edc6..840035068 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
@@ -1,6 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
 
 package org.apache.streampipes.dataexplorer.commons;
 
+import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerConnectionSettings;
+import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerDefaults;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
 import org.influxdb.dto.Point;
@@ -8,6 +27,7 @@ import org.influxdb.dto.Point;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+@Deprecated
 public class DataExplorerWriter {
     private InfluxDB influxDB;
 
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
new file mode 100644
index 000000000..a81b6e327
--- /dev/null
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.commons;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.commons.image.ImageStore;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxStore;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TimeSeriesStore {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStore.class);
+
+    private ImageStore imageStore;
+    private InfluxStore influxStore;
+
+
+    public TimeSeriesStore(SpConfig config,
+                           StreamPipesClient client,
+                           DataLakeMeasure measure,
+                           boolean enableImageStore) {
+
+        measure = DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);
+
+        if (enableImageStore) {
+            // TODO check if event properties are replaces correctly
+            this.imageStore = new ImageStore(measure, config);
+        }
+
+        this.influxStore = new InfluxStore(measure, config);
+
+    }
+
+    public boolean onEvent(Event event) throws SpRuntimeException {
+        // Store all images in image store and replace image with internal id
+        if (imageStore != null) {
+            this.imageStore.onEvent(event);
+        }
+
+        // Store event in time series database
+        this.influxStore.onEvent(event);
+
+        return true;
+    }
+
+
+    public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) {
+        return true;
+    }
+
+    public void close() throws SpRuntimeException  {
+        if (influxStore != null) {
+            try {
+                this.imageStore.close();
+            } catch (IOException e) {
+                LOG.error("Could not close couchDB connection");
+                throw new SpRuntimeException(e);
+            }
+        }
+
+        this.influxStore.close();
+    }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
similarity index 55%
copy from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
copy to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
index 723d29875..8dc7d7370 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
@@ -15,25 +15,22 @@
  * limitations under the License.
  *
  */
+package org.apache.streampipes.dataexplorer.commons.couchdb;
 
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 
-public class DataLakeUtils {
+import java.util.Arrays;
+import java.util.List;
 
-  public static String prepareString(String s) {
-    return s.toLowerCase().replaceAll(" ", "_");
-  }
 
-  private static String renameReservedKeywords(String runtimeName) {
-    if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
-      return runtimeName + "_";
-    } else {
-      return runtimeName;
+public class CouchDbConfigurations {
+
+    public static List<ConfigItem> getDefaults() {
+        return Arrays.asList(
+                ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
+                ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
+                ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
+        );
     }
-  }
 
-  public static String sanitizePropertyRuntimeName(String runtimeName) {
-    String sanitizedRuntimeName = prepareString(runtimeName);
-    return renameReservedKeywords(sanitizedRuntimeName);
-  }
-}
+}
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
similarity index 91%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
index cf64457e4..4dd6b4fb8 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
@@ -16,10 +16,9 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.config;
-
-public class ConfigKeys {
+package org.apache.streampipes.dataexplorer.commons.couchdb;
 
+public class CouchDbEnvKeys {
     public final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
     public final static String COUCHDB_PORT = "SP_COUCHDB_PORT";
     public final static String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
new file mode 100644
index 000000000..1a3e2356f
--- /dev/null
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.commons.image;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.commons.couchdb.CouchDbEnvKeys;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+import org.lightcouch.CouchDbClient;
+import org.lightcouch.CouchDbProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class ImageStore {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ImageStore.class);
+  private static final String DB_NAME = "images";
+
+  private List<EventProperty> imageProperties;
+  private CouchDbClient couchDbClient;
+
+  public ImageStore(DataLakeMeasure measure, SpConfig config) {
+    this.couchDbClient = new CouchDbClient(from(config));
+    this.imageProperties = ImageStoreUtils.getImageProperties(measure);
+  }
+
+  public void onEvent(Event event) throws SpRuntimeException{
+    this.imageProperties.forEach(eventProperty -> {
+      String imageDocId = UUID.randomUUID().toString();
+      String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
+
+      byte[] data = Base64.decodeBase64(image);
+      storeImage(data, imageDocId);
+      event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
+    });
+  }
+
+  public void storeImage(byte[] imageBytes,
+                         String imageDocId) {
+    this.couchDbClient.saveAttachment(
+      new ByteArrayInputStream(imageBytes),
+      imageDocId,
+      "image/jpeg",
+      imageDocId,
+      null);
+
+  }
+
+  public void close() throws IOException {
+    this.couchDbClient.close();
+  }
+
+  private static CouchDbProperties from(SpConfig config) {
+    String couchDbProtocol = config.getString(CouchDbEnvKeys.COUCHDB_PROTOCOL);
+    String couchDbHost = config.getString(CouchDbEnvKeys.COUCHDB_HOST);
+    int couchDbPort = config.getInteger(CouchDbEnvKeys.COUCHDB_PORT);
+
+    return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
+      couchDbHost, couchDbPort, null, null);
+  }
+}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
similarity index 55%
copy from streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
copy to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
index 6463d9428..ba3ab6c0c 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
@@ -16,32 +16,22 @@
  *
  */
 
-package org.apache.streampipes.ps;
+package org.apache.streampipes.dataexplorer.commons.image;
 
-import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
-import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.vocabulary.SPSensor;
 
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.util.List;
+import java.util.stream.Collectors;
 
-@Path("/v3/datalake")
-public class DataLakeResourceV3 extends AbstractRestResource {
-  public DataLakeResourceV3() {
-  }
-
-  @GET
-  @JacksonSerialized
-  @Produces(MediaType.APPLICATION_JSON)
-  @Path("/info")
-  public Response getAllInfos() {
-    List<DataLakeMeasure> result = DataExplorerUtils.getInfos();
-    return ok(result);
-  }
+public class ImageStoreUtils {
 
+    public static List<EventProperty> getImageProperties(DataLakeMeasure measure) {
+        return  measure.getEventSchema().getEventProperties().stream()
+                .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
+            eventProperty.getDomainProperties().size() > 0 &&
+            eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
+            .collect(Collectors.toList());
+    }
 }
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
similarity index 97%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConfigurations.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
index 595d75f84..1baf5aa72 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConfigurations.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
 import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
 
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConnectionSettings.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
similarity index 92%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConnectionSettings.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
index 08c6fc312..63b943dec 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConnectionSettings.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
@@ -16,8 +16,9 @@
  *
  */
 
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
 
 public class DataExplorerConnectionSettings {
@@ -30,16 +31,16 @@ public class DataExplorerConnectionSettings {
   private final String password;
 
   public static DataExplorerConnectionSettings from(SpConfig configStore) {
-    return  from(configStore, "");
+    return  from(configStore, null);
   }
 
-  public static DataExplorerConnectionSettings from(SpConfig configStore, String measureName) {
+  public static DataExplorerConnectionSettings from(SpConfig configStore, DataLakeMeasure measure) {
 
     return new DataExplorerConnectionSettings(
             configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL) + "://" + configStore.getString(DataExplorerEnvKeys.DATA_LAKE_HOST),
             configStore.getInteger(DataExplorerEnvKeys.DATA_LAKE_PORT),
             configStore.getString(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME),
-            measureName,
+            measure.getMeasureName(),
             configStore.getString(DataExplorerEnvKeys.DATA_LAKE_USERNAME),
             configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PASSWORD));
   }
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerDefaults.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
similarity index 95%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerDefaults.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
index bc6cc263b..c31c02c38 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerDefaults.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
 public class DataExplorerDefaults {
     public final static String DATA_LAKE_HOST = "influxdb";
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
similarity index 95%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerEnvKeys.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
index c93b5f9d8..84ad4557e 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerEnvKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
 public class DataExplorerEnvKeys {
     public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
similarity index 97%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
index 66911ef8c..754fc7c94 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
 import java.util.Arrays;
 import java.util.List;
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
similarity index 54%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
index 723d29875..df7860a75 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
@@ -16,24 +16,24 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
-public class DataLakeUtils {
+public class InfluxNameSanitizer {
 
-  public static String prepareString(String s) {
-    return s.toLowerCase().replaceAll(" ", "_");
-  }
+    public static String prepareString(String s) {
+        return s.replaceAll(" ", "_");
+    }
 
-  private static String renameReservedKeywords(String runtimeName) {
-    if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
-      return runtimeName + "_";
-    } else {
-      return runtimeName;
+    private static String renameReservedKeywords(String runtimeName) {
+        if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
+            return runtimeName + "_";
+        } else {
+            return runtimeName;
+        }
     }
-  }
 
-  public static String sanitizePropertyRuntimeName(String runtimeName) {
-    String sanitizedRuntimeName = prepareString(runtimeName);
-    return renameReservedKeywords(sanitizedRuntimeName);
-  }
+    public static String sanitizePropertyRuntimeName(String runtimeName) {
+        String sanitizedRuntimeName = prepareString(runtimeName);
+        return renameReservedKeywords(sanitizedRuntimeName);
+    }
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
similarity index 76%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 5b8aa2f01..8b42d940a 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -16,18 +16,19 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+package org.apache.streampipes.dataexplorer.commons.influx;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerConnectionSettings;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.PrimitiveField;
 import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.vocabulary.XSD;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
+
 import org.influxdb.dto.Point;
 import org.influxdb.dto.Pong;
 import org.influxdb.dto.Query;
@@ -40,48 +41,45 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-/**
- * Code is the same as InfluxDB (org.apache.streampipes.sinks.databases.jvm.influxdb) sink. Changes applied here should also be applied in the InfluxDB sink
- */
-public class DataLakeInfluxDbClient {
+public class InfluxStore {
 
-    private static final Logger LOG = LoggerFactory.getLogger(DataLakeInfluxDbClient.class);
+    private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
 
-    private final String measureName;
-    private final String timestampField;
     private final Integer batchSize = 2000;
     private final Integer flushDuration = 500;
     private InfluxDB influxDb = null;
-    private final DataExplorerConnectionSettings settings;
-    private final EventSchema originalEventSchema;
+    DataLakeMeasure measure;
+//    private final DataExplorerConnectionSettings settings;
 
     Map<String, String> targetRuntimeNames = new HashMap<>();
 
-    public DataLakeInfluxDbClient(DataExplorerConnectionSettings settings,
-                           String timestampField,
-                           EventSchema originalEventSchema) throws SpRuntimeException {
-        this.settings = settings;
-        this.originalEventSchema = originalEventSchema;
-        this.timestampField = timestampField;
-        this.measureName = settings.getMeasureName();
+    public InfluxStore(DataLakeMeasure measure,
+                       SpConfig configStore) throws SpRuntimeException {
 
-        prepareSchema();
-        connect();
-    }
+        this.measure = measure;
+        DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(configStore, measure);
 
-    private void prepareSchema() {
-        originalEventSchema
+        // TODO check if this works
+        measure.getEventSchema()
                 .getEventProperties()
-                .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), DataLakeUtils.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+                .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+
+        connect(settings);
     }
 
+    /**
+     *
+     * @param measure
+     */
+
+
     /**
      * Connects to the InfluxDB Server, sets the database and initializes the batch-behaviour
      *
      * @throws SpRuntimeException If not connection can be established or if the database could not
      * be found
      */
-    private void connect() throws SpRuntimeException {
+    private void connect(DataExplorerConnectionSettings settings) throws SpRuntimeException {
         // Connecting to the server
         // "http://" must be in front
         String urlAndPort = settings.getInfluxDbHost() + ":" + settings.getInfluxDbPort();
@@ -105,13 +103,6 @@ public class DataLakeInfluxDbClient {
         influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
     }
 
-    /**
-     * Checks whether the given database exists. Needs a working connection to an InfluxDB Server
-     * ({@link DataLakeInfluxDbClient#influxDb} needs to be initialized)
-     *
-     * @param dbName The name of the database, the method should look for
-     * @return True if the database exists, false otherwise
-     */
     private boolean databaseExists(String dbName) {
         QueryResult queryResult = influxDb.query(new Query("SHOW DATABASES", ""));
         for(List<Object> a : queryResult.getResults().get(0).getSeries().get(0).getValues()) {
@@ -140,19 +131,20 @@ public class DataLakeInfluxDbClient {
      * @param event The event which should be saved
      * @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
      */
-    public void save(Event event, EventSchema schema) throws SpRuntimeException {
+    public void onEvent(Event event) throws SpRuntimeException {
         if (event == null) {
             throw new SpRuntimeException("event is null");
         }
 
-        Long timestampValue = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
-        Point.Builder p = Point.measurement(measureName).time(timestampValue, TimeUnit.MILLISECONDS);
+        Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
+        Point.Builder p = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS);
 
-        for (EventProperty ep : schema.getEventProperties()) {
+        for (EventProperty ep : measure.getEventSchema().getEventProperties()) {
             if (ep instanceof EventPropertyPrimitive) {
                 String runtimeName = ep.getRuntimeName();
 
-                if (!timestampField.endsWith(runtimeName)) {
+                // timestamp should not be added as a field
+                if (!measure.getTimestampField().endsWith(runtimeName)) {
                     String preparedRuntimeName = targetRuntimeNames.get(runtimeName);
                     PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
 
@@ -190,12 +182,12 @@ public class DataLakeInfluxDbClient {
     /**
      * Shuts down the connection to the InfluxDB server
      */
-    public void stop() {
+    public void close() throws SpRuntimeException {
         influxDb.flush();
         try {
             Thread.sleep(1000);
         } catch (InterruptedException e) {
-            e.printStackTrace();
+            throw new SpRuntimeException(e);
         }
         influxDb.close();
     }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 1ccb4742d..523cbe8aa 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -36,12 +36,16 @@ import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
 import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.schema.*;
+import org.apache.streampipes.storage.api.IDataLakeStorage;
 import org.apache.streampipes.storage.couchdb.utils.Utils;
+import org.apache.streampipes.storage.management.StorageDispatcher;
 import org.influxdb.InfluxDB;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 import org.lightcouch.CouchDbClient;
 
+import javax.xml.crypto.Data;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.time.Instant;
@@ -53,10 +57,7 @@ import java.time.format.DateTimeFormatterBuilder;
 import java.time.temporal.ChronoField;
 import java.time.temporal.ChronoUnit;
 import java.time.temporal.TemporalAccessor;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.*;
@@ -327,4 +328,64 @@ public class DataLakeManagementV4 {
 
         return tags;
     }
+
+
+    // TODO validate method
+    public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
+        List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
+        Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())).findFirst();
+
+        if (optional.isPresent()) {
+            DataLakeMeasure oldEntry = optional.get();
+            if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(), measure.getEventSchema().getEventProperties())) {
+                return oldEntry;
+            }
+        } else {
+            measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+            getDataLakeStorage().storeDataLakeMeasure(measure);
+            return measure;
+        }
+
+        return measure;
+    }
+
+    private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
+        if (prop1.size() != prop2.size()) {
+            return false;
+        }
+
+        return prop1.stream().allMatch(prop -> {
+
+            for (EventProperty property : prop2) {
+                if (prop.getRuntimeName().equals(property.getRuntimeName())) {
+
+                    //primitive
+                    if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
+                        if (((EventPropertyPrimitive) prop)
+                          .getRuntimeType()
+                          .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
+                            return true;
+                        }
+
+                        //list
+                    } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
+                        return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
+                          Collections.singletonList(((EventPropertyList) property).getEventProperty()));
+
+                        //nested
+                    } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
+                        return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
+                          ((EventPropertyNested) property).getEventProperties());
+                    }
+                }
+            }
+            return false;
+
+        });
+    }
+
+
+    private IDataLakeStorage getDataLakeStorage() {
+        return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
+    }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
index 2ae5c3dea..5b411a0cf 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
@@ -28,8 +28,11 @@ import java.util.List;
 import java.util.Optional;
 
 
+@Deprecated
 public class DataLakeNoUserManagementV3 {
 
+
+  @Deprecated
   public boolean addDataLake(String measure, EventSchema eventSchema) {
     List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
     Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
index 62babfd51..3e7159e3a 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
@@ -56,6 +56,7 @@
         </dependency>
 
         <!-- External dependencies -->
+        <!-- TODO remove after refactoring -->
         <dependency>
             <groupId>org.lightcouch</groupId>
             <artifactId>lightcouch</artifactId>
@@ -64,6 +65,8 @@
             <groupId>org.influxdb</groupId>
             <artifactId>influxdb-java</artifactId>
         </dependency>
+        <!-- TODO remove after refactoring -->
+
         <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 7aae0127f..42e0dc80a 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -21,7 +21,8 @@ package org.apache.streampipes.sinks.internal.jvm;
 import org.apache.streampipes.container.model.SpServiceDefinition;
 import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
 import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerConfigurations;
+import org.apache.streampipes.dataexplorer.commons.couchdb.CouchDbConfigurations;
+import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerConfigurations;
 import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
 import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
 import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -29,7 +30,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
 import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink;
 import org.apache.streampipes.sinks.internal.jvm.notification.NotificationProducer;
 
@@ -53,10 +53,8 @@ public class SinksInternalJvmInit extends StandaloneModelSubmitter {
                     new SpKafkaProtocolFactory(),
                     new SpJmsProtocolFactory(),
                     new SpMqttProtocolFactory())
-            .addConfig(ConfigKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs")
-            .addConfig(ConfigKeys.COUCHDB_PORT, 5984, "")
-            .addConfig(ConfigKeys.COUCHDB_PROTOCOL, "http", "")
             .addConfigs(DataExplorerConfigurations.getDefaults())
+            .addConfigs(CouchDbConfigurations.getDefaults())
             .build();
 
 
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 571d027d1..c268573ae 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -1,14 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
 package org.apache.streampipes.sinks.internal.jvm.datalake;
 
-import org.apache.commons.codec.binary.Base64;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerConnectionSettings;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.commons.TimeSeriesStore;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.schema.EventProperty;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
@@ -17,138 +33,60 @@ import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
-import org.apache.streampipes.sinks.internal.jvm.datalake.image.ImageStore;
-import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeInfluxDbClient;
-import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeUtils;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.vocabulary.SPSensor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
 import org.apache.streampipes.wrapper.standalone.SinkParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
 
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
 
 public class DataLakeSink extends StreamPipesDataSink {
-
-    private DataLakeInfluxDbClient influxDbClient;
+    private static Logger LOG;
 
     private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
     private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
 
-    private static Logger LOG;
-
-    private String timestampField;
-    private List<EventProperty> imageProperties;
+    private TimeSeriesStore timeSeriesStore;
 
-    private EventSchema eventSchema;
-    private ImageStore imageStore;
 
     @Override
     public DataSinkDescription declareModel() {
         return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.datalake")
-                .withLocales(Locales.EN)
-                .withAssets(Assets.DOCUMENTATION, Assets.ICON)
-                .category(DataSinkType.INTERNAL)
-                .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
-                        EpRequirements.timestampReq(),
-                        Labels.withId(TIMESTAMP_MAPPING_KEY),
-                        PropertyScope.NONE).build())
-                .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
-                .build();
+          .withLocales(Locales.EN)
+          .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+          .category(DataSinkType.INTERNAL)
+          .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+            EpRequirements.timestampReq(),
+            Labels.withId(TIMESTAMP_MAPPING_KEY),
+            PropertyScope.NONE).build())
+          .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+          .build();
     }
 
     @Override
     public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
 
+
         LOG = parameters.getGraph().getLogger(DataLakeSink.class);
 
-        this.timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+        String timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
         String measureName = parameters.extractor().singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
-        // TODO check if still needed
-        measureName = DataLakeUtils.prepareString(measureName);
-
-
-
-        EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
-        // Remove the timestamp field from the event schema
-        List<EventProperty> eventPropertiesWithoutTimestamp = schema.getEventProperties()
-                .stream()
-                .filter(eventProperty -> !this.timestampField.endsWith(eventProperty.getRuntimeName()))
-                .collect(Collectors.toList());
-        schema.setEventProperties(eventPropertiesWithoutTimestamp);
-
-        // deep copy of event schema. Event property runtime name is changed to lower case for the schema registration
-        this.eventSchema = new EventSchema(schema);
-
-        schema.getEventProperties().forEach(eventProperty ->
-                eventProperty.setRuntimeName(DataLakeUtils.sanitizePropertyRuntimeName(eventProperty.getRuntimeName())));
-        DataExplorerUtils.registerAtDataLake(measureName, schema, runtimeContext.getStreamPipesClient());
+        EventSchema eventSchema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
 
+        DataLakeMeasure measure = new DataLakeMeasure(measureName, timestampField, eventSchema);
 
-        // Get schema from couchdb
+        this.timeSeriesStore = new TimeSeriesStore(runtimeContext.getConfigStore().getConfig(),
+          runtimeContext.getStreamPipesClient(),
+          measure,
+          true);
 
-        // if schema version null -> rename event schema
-
-        //
-
-        imageProperties = schema.getEventProperties().stream()
-                .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
-                        eventProperty.getDomainProperties().size() > 0 &&
-                        eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
-                .collect(Collectors.toList());
-
-
-
-        SpConfig configStore = runtimeContext.getConfigStore().getConfig();
-        String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
-        String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
-        int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
-        this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
-
-        DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(
-                configStore,
-                measureName);
-
-
-
-        this.influxDbClient = new DataLakeInfluxDbClient(
-                settings,
-                this.timestampField,
-                this.eventSchema
-        );
     }
 
     @Override
     public void onEvent(Event event) throws SpRuntimeException {
-        // handles image properties by storing the image in couchdb and replacing the values with the document id
-        try {
-            this.imageProperties.forEach(eventProperty -> {
-                String imageDocId = UUID.randomUUID().toString();
-                String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
-
-                byte[] data = Base64.decodeBase64(image);
-                this.imageStore.storeImage(data, imageDocId);
-                event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
-            });
-        } catch (SpRuntimeException e) {
-            LOG.error(e.getMessage());
-        }
-
-
-        // store event in time series database
-        try {
-            influxDbClient.save(event, this.eventSchema);
-        } catch (SpRuntimeException e) {
-            LOG.error(e.getMessage());
-        }
-
+        this.timeSeriesStore.onEvent(event);
     }
 
     @Override
     public void onDetach() throws SpRuntimeException {
-        influxDbClient.stop();
+        this.timeSeriesStore.close();
     }
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/image/ImageStore.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/image/ImageStore.java
deleted file mode 100644
index 8bec59972..000000000
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/image/ImageStore.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.datalake.image;
-
-import org.lightcouch.CouchDbClient;
-import org.lightcouch.CouchDbProperties;
-
-import java.io.ByteArrayInputStream;
-
-public class ImageStore {
-
-  private static final String DB_NAME = "images";
-
-  private CouchDbClient couchDbClient;
-
-  public ImageStore(String couchDbProtocol,
-                    String couchDbHost,
-                    int couchDbPort) {
-    this.couchDbClient = new CouchDbClient(props(couchDbProtocol, couchDbHost, couchDbPort));
-  }
-
-  public void storeImage(byte[] imageBytes,
-                         String imageDocId) {
-    this.couchDbClient.saveAttachment(
-            new ByteArrayInputStream(imageBytes),
-            imageDocId,
-            "image/jpeg",
-            imageDocId,
-            null);
-  }
-
-  private static CouchDbProperties props(String couchDbProtocol,
-                                         String couchDbHost,
-                                         int couchDbPort) {
-    return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
-            couchDbHost, couchDbPort, null, null);
-  }
-}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
index 7f1df47a9..f1bdc8f82 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
@@ -25,12 +25,17 @@ import org.apache.streampipes.model.shared.annotation.TsModel;
 @TsModel
 public class DataLakeMeasure extends UnnamedStreamPipesEntity {
 
+    public final static String CURRENT_SCHEMA_VERSION = "1.1";
     private String measureName;
+
+    private String timestampField;
     private EventSchema eventSchema;
     private String pipelineId;
     private String pipelineName;
     private boolean pipelineIsRunning;
 
+    private String schemaVersion;
+
     public DataLakeMeasure() {
         super();
     }
@@ -47,6 +52,12 @@ public class DataLakeMeasure extends UnnamedStreamPipesEntity {
         this.eventSchema = eventSchema;
     }
 
+    public DataLakeMeasure(String measureName, String timestampField, EventSchema eventSchema) {
+        this.measureName = measureName;
+        this.eventSchema = eventSchema;
+        this.timestampField = timestampField;
+    }
+
     public String getMeasureName() {
         return measureName;
     }
@@ -87,4 +98,19 @@ public class DataLakeMeasure extends UnnamedStreamPipesEntity {
         this.pipelineIsRunning = pipelineIsRunning;
     }
 
+    public String getSchemaVersion() {
+        return schemaVersion;
+    }
+
+    public void setSchemaVersion(String schemaVersion) {
+        this.schemaVersion = schemaVersion;
+    }
+
+    public String getTimestampField() {
+        return timestampField;
+    }
+
+    public void setTimestampField(String timestampField) {
+        this.timestampField = timestampField;
+    }
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
similarity index 97%
copy from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
copy to streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
index 57e3b935c..877b88451 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.rest.impl.datalake;
+package org.apache.streampipes.ps;
 
 import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
 import org.apache.streampipes.model.schema.EventSchema;
@@ -28,6 +28,7 @@ import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
 @Path("/v3/datalake/measure")
+@Deprecated
 public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
 
     private DataLakeNoUserManagementV3 dataLakeManagement;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
similarity index 68%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
rename to streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index 57e3b935c..4319cf84c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -16,9 +16,11 @@
  *
  */
 
-package org.apache.streampipes.rest.impl.datalake;
+package org.apache.streampipes.ps;
 
+import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
 import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
 import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -27,26 +29,22 @@ import javax.ws.rs.*;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 
-@Path("/v3/datalake/measure")
-public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
+@Path("/v4/datalake/measure")
+public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
 
-    private DataLakeNoUserManagementV3 dataLakeManagement;
+    private DataLakeManagementV4 dataLakeManagement;
 
-    public DataLakeMeasureResourceV3() {
-        this.dataLakeManagement = new DataLakeNoUserManagementV3();
+    public DataLakeMeasureResourceV4() {
+        this.dataLakeManagement = new DataLakeManagementV4();
     }
 
     @POST
     @JacksonSerialized
     @Produces(MediaType.APPLICATION_JSON)
     @Consumes(MediaType.APPLICATION_JSON)
-    @Path("/{measure}")
-    public Response addDataLake(@PathParam("measure") String measure, EventSchema eventSchema) {
-        if (this.dataLakeManagement.addDataLake(measure, eventSchema)) {
-            return ok();
-        } else {
-            return Response.status(409).build();
-        }
-
+    @Path("/")
+    public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
+        DataLakeMeasure result = this.dataLakeManagement.addDataLake(dataLakeMeasure);
+        return ok(result);
     }
 }
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
index 6463d9428..00fd8c9b7 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
@@ -31,6 +31,7 @@ import javax.ws.rs.core.Response;
 import java.util.List;
 
 @Path("/v3/datalake")
+@Deprecated
 public class DataLakeResourceV3 extends AbstractRestResource {
   public DataLakeResourceV3() {
   }