You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/07/29 09:20:04 UTC

[incubator-streampipes] branch STREAMPIPES-563 updated: [STREAMPIPES-563] Started to restructured code of data lake sink

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch STREAMPIPES-563
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-563 by this push:
     new c94769c20 [STREAMPIPES-563] Started to restructured code of data lake sink
c94769c20 is described below

commit c94769c20abed3e2f840b660d7a62f7530c7babe
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri Jul 29 11:19:27 2022 +0200

    [STREAMPIPES-563] Started to restructured code of data lake sink
---
 .../sinks/internal/jvm/datalake/DataLakeSink.java  | 38 +++++++++++++---------
 .../{ => influx}/DataLakeInfluxDbClient.java       | 17 ++++------
 .../jvm/datalake/{ => influx}/DataLakeUtils.java   |  2 +-
 .../{ => influx}/InfluxDbReservedKeywords.java     |  2 +-
 4 files changed, 30 insertions(+), 29 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 410d64ff9..571d027d1 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -19,6 +19,8 @@ import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
 import org.apache.streampipes.sinks.internal.jvm.datalake.image.ImageStore;
+import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeInfluxDbClient;
+import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeUtils;
 import org.apache.streampipes.svcdiscovery.api.SpConfig;
 import org.apache.streampipes.vocabulary.SPSensor;
 import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -65,15 +67,10 @@ public class DataLakeSink extends StreamPipesDataSink {
 
         this.timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
         String measureName = parameters.extractor().singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
+        // TODO check if still needed
         measureName = DataLakeUtils.prepareString(measureName);
 
-        SpConfig configStore = runtimeContext.getConfigStore().getConfig();
-
-        String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
-        String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
-        int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
 
-        this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
 
         EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
         // Remove the timestamp field from the event schema
@@ -103,46 +100,55 @@ public class DataLakeSink extends StreamPipesDataSink {
                         eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
                 .collect(Collectors.toList());
 
+
+
+        SpConfig configStore = runtimeContext.getConfigStore().getConfig();
+        String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
+        String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
+        int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
+        this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
+
         DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(
                 configStore,
                 measureName);
 
-        Integer batchSize = 2000;
-        Integer flushDuration = 500;
+
 
         this.influxDbClient = new DataLakeInfluxDbClient(
                 settings,
                 this.timestampField,
-                batchSize,
-                flushDuration,
                 this.eventSchema
         );
     }
 
     @Override
     public void onEvent(Event event) throws SpRuntimeException {
+        // handles image properties by storing the image in couchdb and replacing the values with the document id
         try {
             this.imageProperties.forEach(eventProperty -> {
                 String imageDocId = UUID.randomUUID().toString();
                 String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
 
-                this.writeToImageFile(image, imageDocId);
+                byte[] data = Base64.decodeBase64(image);
+                this.imageStore.storeImage(data, imageDocId);
                 event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
             });
+        } catch (SpRuntimeException e) {
+            LOG.error(e.getMessage());
+        }
 
+
+        // store event in time series database
+        try {
             influxDbClient.save(event, this.eventSchema);
         } catch (SpRuntimeException e) {
             LOG.error(e.getMessage());
         }
+
     }
 
     @Override
     public void onDetach() throws SpRuntimeException {
         influxDbClient.stop();
     }
-
-    private void writeToImageFile(String image, String imageDocId) {
-        byte[] data = Base64.decodeBase64(image);
-        this.imageStore.storeImage(data, imageDocId);
-    }
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
similarity index 94%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
rename to streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
index 9c51aeac8..5b8aa2f01 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.datalake;
+package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.dataexplorer.commons.DataExplorerConnectionSettings;
@@ -49,25 +49,20 @@ public class DataLakeInfluxDbClient {
 
     private final String measureName;
     private final String timestampField;
-    private final Integer batchSize;
-    private final Integer flushDuration;
-
+    private final Integer batchSize = 2000;
+    private final Integer flushDuration = 500;
     private InfluxDB influxDb = null;
     private final DataExplorerConnectionSettings settings;
     private final EventSchema originalEventSchema;
 
     Map<String, String> targetRuntimeNames = new HashMap<>();
 
-    DataLakeInfluxDbClient(DataExplorerConnectionSettings settings,
+    public DataLakeInfluxDbClient(DataExplorerConnectionSettings settings,
                            String timestampField,
-                           Integer batchSize,
-                           Integer flushDuration,
                            EventSchema originalEventSchema) throws SpRuntimeException {
         this.settings = settings;
         this.originalEventSchema = originalEventSchema;
         this.timestampField = timestampField;
-        this.batchSize = batchSize;
-        this.flushDuration = flushDuration;
         this.measureName = settings.getMeasureName();
 
         prepareSchema();
@@ -145,7 +140,7 @@ public class DataLakeInfluxDbClient {
      * @param event The event which should be saved
      * @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
      */
-    void save(Event event, EventSchema schema) throws SpRuntimeException {
+    public void save(Event event, EventSchema schema) throws SpRuntimeException {
         if (event == null) {
             throw new SpRuntimeException("event is null");
         }
@@ -195,7 +190,7 @@ public class DataLakeInfluxDbClient {
     /**
      * Shuts down the connection to the InfluxDB server
      */
-    void stop() {
+    public void stop() {
         influxDb.flush();
         try {
             Thread.sleep(1000);
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
similarity index 95%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
rename to streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
index 8fa1736b3..723d29875 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.datalake;
+package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
 
 public class DataLakeUtils {
 
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
similarity index 97%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
rename to streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
index 26e7a149a..66911ef8c 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.sinks.internal.jvm.datalake;
+package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
 
 import java.util.Arrays;
 import java.util.List;