You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/25 08:58:05 UTC
[incubator-streampipes-extensions] branch
feature/datalake-rest-extension updated: Additions to DataLake Sink
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch feature/datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/feature/datalake-rest-extension by this push:
new 1620aee Additions to DataLake Sink
1620aee is described below
commit 1620aee912ed52dcb30f1a5c8f7e5769fd697fdd
Author: Felix John <jo...@axantu.com>
AuthorDate: Fri Sep 25 10:57:49 2020 +0200
Additions to DataLake Sink
---
.../sinks/internal/jvm/datalake/DataLake.java | 2 ++
.../internal/jvm/datalake/DataLakeController.java | 30 +++++++++++++++---
.../jvm/datalake/DataLakeInfluxDbClient.java | 36 +++++++++++++++++++---
.../internal/jvm/datalake/DataLakeParameters.java | 12 +++++++-
.../strings.en | 19 ++++++++++++
5 files changed, 90 insertions(+), 9 deletions(-)
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index f82506d..25f7ccc 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -71,6 +71,8 @@ public class DataLake implements EventSink<DataLakeParameters> {
parameters.getBatchSize(),
parameters.getFlushDuration(),
parameters.getDimensionProperties(),
+ parameters.getCustomRpName(),
+ parameters.getCustomRpDuration(),
LOG
);
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index 3c44a58..cacdc2a 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -22,9 +22,11 @@ import org.apache.streampipes.model.DataSinkType;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.sdk.helpers.Alternatives;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
@@ -39,7 +41,12 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
-
+ private static final String RETENTION_POLICY_KEY = "retention_policy";
+ private static final String DEFAULT_RETENTION_POLICY = "default_retention_policy";
+ private static final String CUSTOM_RETENTION_POLICY = "custom_retention_policy";
+ private static final String CUSTOM_RETENTION_POLICY_GROUP = "custom_retention_policy_group";
+ private static final String CUSTOM_RETENTION_POLICY_NAME= "custom_retention_policy_name";
+ private static final String CUSTOM_RETENTION_POLICY_DURATION = "custom_retention_policy_duration";
@Override
public DataSinkDescription declareModel() {
@@ -52,6 +59,12 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
Labels.withId(TIMESTAMP_MAPPING_KEY),
PropertyScope.NONE).build())
.requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+ .requiredAlternatives(Labels.withId(RETENTION_POLICY_KEY),
+ Alternatives.from(Labels.withId(DEFAULT_RETENTION_POLICY)),
+ Alternatives.from(Labels.withId(CUSTOM_RETENTION_POLICY),
+ StaticProperties.group(Labels.withId(CUSTOM_RETENTION_POLICY_GROUP),
+ StaticProperties.stringFreeTextProperty(Labels.withId(CUSTOM_RETENTION_POLICY_NAME)),
+ StaticProperties.stringFreeTextProperty(Labels.withId(CUSTOM_RETENTION_POLICY_DURATION)))))
.build();
}
@@ -59,11 +72,19 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
public ConfiguredEventSink<DataLakeParameters> onInvocation(DataSinkInvocation graph,
DataSinkParameterExtractor extractor) {
-
String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
measureName = DataLake.prepareString(measureName);
String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+ String selectedAlternative = extractor.selectedAlternativeInternalId(RETENTION_POLICY_KEY);
+ String custom_rp_name = "";
+ String custom_rp_duration = "";
+
+ if (selectedAlternative.equals(CUSTOM_RETENTION_POLICY)) {
+ custom_rp_name = extractor.singleValueParameter(CUSTOM_RETENTION_POLICY_NAME, String.class);
+ custom_rp_duration = extractor.singleValueParameter(CUSTOM_RETENTION_POLICY_DURATION, String.class);
+ }
+
String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
@@ -84,8 +105,9 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
timestampField,
batch_size,
flush_duration,
- dimensionProperties);
-
+ dimensionProperties,
+ custom_rp_name,
+ custom_rp_duration);
return new ConfiguredEventSink<>(params, DataLake::new);
}
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index aacbf3e..1746089 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -18,8 +18,11 @@
package org.apache.streampipes.sinks.internal.jvm.datalake;
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.checkerframework.checker.units.qual.C;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBException;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
@@ -47,6 +50,8 @@ public class DataLakeInfluxDbClient {
private Integer batchSize;
private Integer flushDuration;
private List<String> tagFields;
+ private String customRpName;
+ private String customRpDuration;
private Logger logger;
@@ -63,7 +68,10 @@ public class DataLakeInfluxDbClient {
Integer batchSize,
Integer flushDuration,
List<String> tagsFields,
+ String customRpName,
+ String customRpDuration,
Logger logger) throws SpRuntimeException {
+
this.influxDbHost = influxDbHost;
this.influxDbPort = influxDbPort;
this.databaseName = databaseName;
@@ -73,6 +81,8 @@ public class DataLakeInfluxDbClient {
this.timestampField = timestampField;
this.batchSize = batchSize;
this.flushDuration = flushDuration;
+ this.customRpName = customRpName;
+ this.customRpDuration = customRpDuration;
this.logger = logger;
this.tagFields = tagsFields;
@@ -108,7 +118,7 @@ public class DataLakeInfluxDbClient {
* be found
*/
private void connect() throws SpRuntimeException {
- // Connecting to the server
+ // Connecting to the server
// "http://" must be in front
String urlAndPort = influxDbHost + ":" + influxDbPort;
influxDb = InfluxDBFactory.connect(urlAndPort, user, password);
@@ -125,9 +135,24 @@ public class DataLakeInfluxDbClient {
createDatabase(databaseName);
}
- // setting up the database
+ // Setting up the database
influxDb.setDatabase(databaseName);
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+
+ // Optional: Set Retention Policy
+ if (customRpName != "" && customRpDuration != "") {
+
+ QueryResult result = influxDb.query(new Query("CREATE RETENTION POLICY "
+ + customRpName + " ON "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName()
+ + " Duration " + customRpDuration
+ + " REPLICATION 1",
+ BackendConfig.INSTANCE.getInfluxDatabaseName()));
+
+ if (result.hasError() || result.getResults().get(0).getError() != null) {
+ throw new SpRuntimeException("Could not create the retention policy '" + customRpName + "': " + result.getResults().get(0).getError());
+ }
+ }
}
/**
@@ -190,8 +215,11 @@ public class DataLakeInfluxDbClient {
}
}
}
-
- influxDb.write(p.build());
+ if (customRpName != "" && customRpDuration != "") {
+ influxDb.write(BackendConfig.INSTANCE.getInfluxDatabaseName(), customRpName, p.build());
+ } else {
+ influxDb.write(p.build());
+ }
}
/**
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
index 4c6158d..0c33a93 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
@@ -38,6 +38,8 @@ public class DataLakeParameters extends EventSinkBindingParams {
private Integer batchSize;
private Integer flushDuration;
private List<String> dimensionProperties;
+ private String customRpName;
+ private String customRpDuration;
public DataLakeParameters(DataSinkInvocation graph,
String influxDbHost,
@@ -49,7 +51,9 @@ public class DataLakeParameters extends EventSinkBindingParams {
String timestampField,
Integer batchSize,
Integer flushDuration,
- List<String> dimensionProperties) {
+ List<String> dimensionProperties,
+ String customRpName,
+ String customRpDuration) {
super(graph);
this.influxDbHost = influxDbHost;
@@ -62,6 +66,8 @@ public class DataLakeParameters extends EventSinkBindingParams {
this.batchSize = batchSize;
this.flushDuration = flushDuration;
this.dimensionProperties = dimensionProperties;
+ this.customRpName = customRpName;
+ this.customRpDuration = customRpDuration;
}
public String getInfluxDbHost() {
@@ -103,4 +109,8 @@ public class DataLakeParameters extends EventSinkBindingParams {
public List<String> getDimensionProperties() {
return dimensionProperties;
}
+
+ public String getCustomRpName() { return customRpName; }
+
+ public String getCustomRpDuration() { return customRpDuration; }
}
diff --git a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
index 396d799..552f97f 100644
--- a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
+++ b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
@@ -6,3 +6,22 @@ db_measurement.description=The name of the identifier under which the data is to
timestamp_mapping.title=Timestamp Field
timestamp_mapping.description=The value which contains a timestamp
+
+retention_policy.title=Retention Policy
+retention_policy.description=Specify the retention policy for the data
+
+default_retention_policy.title=Default Retention Policy
+default_retention_policy.description=
+
+custom_retention_policy.title=Custom Retention Policy
+custom_retention_policy.description=
+
+custom_retention_policy_group.title=Custom RP Group
+custom_retention_policy_group.description=
+
+custom_retention_policy_name.title=Retention Policy Name
+custom_retention_policy_name.description=
+
+custom_retention_policy_duration.title=Retention Policy Duration
+custom_retention_policy_duration.description=Specify the amount of time to retain the data (Example: 1h, 3d, 10w, ...)
+