You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/07/29 09:20:04 UTC
[incubator-streampipes] branch STREAMPIPES-563 updated: [STREAMPIPES-563] Started to restructured code of data lake sink
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-563
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-563 by this push:
new c94769c20 [STREAMPIPES-563] Started to restructured code of data lake sink
c94769c20 is described below
commit c94769c20abed3e2f840b660d7a62f7530c7babe
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Fri Jul 29 11:19:27 2022 +0200
[STREAMPIPES-563] Started to restructured code of data lake sink
---
.../sinks/internal/jvm/datalake/DataLakeSink.java | 38 +++++++++++++---------
.../{ => influx}/DataLakeInfluxDbClient.java | 17 ++++------
.../jvm/datalake/{ => influx}/DataLakeUtils.java | 2 +-
.../{ => influx}/InfluxDbReservedKeywords.java | 2 +-
4 files changed, 30 insertions(+), 29 deletions(-)
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 410d64ff9..571d027d1 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -19,6 +19,8 @@ import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
import org.apache.streampipes.sinks.internal.jvm.datalake.image.ImageStore;
+import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeInfluxDbClient;
+import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeUtils;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.vocabulary.SPSensor;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
@@ -65,15 +67,10 @@ public class DataLakeSink extends StreamPipesDataSink {
this.timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
String measureName = parameters.extractor().singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
+ // TODO check if still needed
measureName = DataLakeUtils.prepareString(measureName);
- SpConfig configStore = runtimeContext.getConfigStore().getConfig();
-
- String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
- String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
- int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
- this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
// Remove the timestamp field from the event schema
@@ -103,46 +100,55 @@ public class DataLakeSink extends StreamPipesDataSink {
eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
.collect(Collectors.toList());
+
+
+ SpConfig configStore = runtimeContext.getConfigStore().getConfig();
+ String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
+ String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
+ int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
+ this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
+
DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(
configStore,
measureName);
- Integer batchSize = 2000;
- Integer flushDuration = 500;
+
this.influxDbClient = new DataLakeInfluxDbClient(
settings,
this.timestampField,
- batchSize,
- flushDuration,
this.eventSchema
);
}
@Override
public void onEvent(Event event) throws SpRuntimeException {
+ // handles image properties by storing the image in couchdb and replacing the values with the document id
try {
this.imageProperties.forEach(eventProperty -> {
String imageDocId = UUID.randomUUID().toString();
String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
- this.writeToImageFile(image, imageDocId);
+ byte[] data = Base64.decodeBase64(image);
+ this.imageStore.storeImage(data, imageDocId);
event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
});
+ } catch (SpRuntimeException e) {
+ LOG.error(e.getMessage());
+ }
+
+ // store event in time series database
+ try {
influxDbClient.save(event, this.eventSchema);
} catch (SpRuntimeException e) {
LOG.error(e.getMessage());
}
+
}
@Override
public void onDetach() throws SpRuntimeException {
influxDbClient.stop();
}
-
- private void writeToImageFile(String image, String imageDocId) {
- byte[] data = Base64.decodeBase64(image);
- this.imageStore.storeImage(data, imageDocId);
- }
}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
similarity index 94%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
rename to streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
index 9c51aeac8..5b8aa2f01 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.datalake;
+package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.DataExplorerConnectionSettings;
@@ -49,25 +49,20 @@ public class DataLakeInfluxDbClient {
private final String measureName;
private final String timestampField;
- private final Integer batchSize;
- private final Integer flushDuration;
-
+ private final Integer batchSize = 2000;
+ private final Integer flushDuration = 500;
private InfluxDB influxDb = null;
private final DataExplorerConnectionSettings settings;
private final EventSchema originalEventSchema;
Map<String, String> targetRuntimeNames = new HashMap<>();
- DataLakeInfluxDbClient(DataExplorerConnectionSettings settings,
+ public DataLakeInfluxDbClient(DataExplorerConnectionSettings settings,
String timestampField,
- Integer batchSize,
- Integer flushDuration,
EventSchema originalEventSchema) throws SpRuntimeException {
this.settings = settings;
this.originalEventSchema = originalEventSchema;
this.timestampField = timestampField;
- this.batchSize = batchSize;
- this.flushDuration = flushDuration;
this.measureName = settings.getMeasureName();
prepareSchema();
@@ -145,7 +140,7 @@ public class DataLakeInfluxDbClient {
* @param event The event which should be saved
* @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
*/
- void save(Event event, EventSchema schema) throws SpRuntimeException {
+ public void save(Event event, EventSchema schema) throws SpRuntimeException {
if (event == null) {
throw new SpRuntimeException("event is null");
}
@@ -195,7 +190,7 @@ public class DataLakeInfluxDbClient {
/**
* Shuts down the connection to the InfluxDB server
*/
- void stop() {
+ public void stop() {
influxDb.flush();
try {
Thread.sleep(1000);
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
similarity index 95%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
rename to streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
index 8fa1736b3..723d29875 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.datalake;
+package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
public class DataLakeUtils {
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
similarity index 97%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
rename to streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
index 26e7a149a..66911ef8c 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.datalake;
+package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
import java.util.Arrays;
import java.util.List;