You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/25 08:58:05 UTC

[incubator-streampipes-extensions] branch feature/datalake-rest-extension updated: Additions to DataLake Sink

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch feature/datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/feature/datalake-rest-extension by this push:
     new 1620aee  Additions to DataLake Sink
1620aee is described below

commit 1620aee912ed52dcb30f1a5c8f7e5769fd697fdd
Author: Felix John <jo...@axantu.com>
AuthorDate: Fri Sep 25 10:57:49 2020 +0200

    Additions to DataLake Sink
---
 .../sinks/internal/jvm/datalake/DataLake.java      |  2 ++
 .../internal/jvm/datalake/DataLakeController.java  | 30 +++++++++++++++---
 .../jvm/datalake/DataLakeInfluxDbClient.java       | 36 +++++++++++++++++++---
 .../internal/jvm/datalake/DataLakeParameters.java  | 12 +++++++-
 .../strings.en                                     | 19 ++++++++++++
 5 files changed, 90 insertions(+), 9 deletions(-)

diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index f82506d..25f7ccc 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -71,6 +71,8 @@ public class DataLake implements EventSink<DataLakeParameters> {
             parameters.getBatchSize(),
             parameters.getFlushDuration(),
             parameters.getDimensionProperties(),
+            parameters.getCustomRpName(),
+            parameters.getCustomRpDuration(),
             LOG
     );
 
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index 3c44a58..cacdc2a 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -22,9 +22,11 @@ import org.apache.streampipes.model.DataSinkType;
 import org.apache.streampipes.model.graph.DataSinkDescription;
 import org.apache.streampipes.model.graph.DataSinkInvocation;
 import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.sdk.StaticProperties;
 import org.apache.streampipes.sdk.builder.DataSinkBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.DataSinkParameterExtractor;
+import org.apache.streampipes.sdk.helpers.Alternatives;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
@@ -39,7 +41,12 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
 
   private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
   private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
-
+  private static final String RETENTION_POLICY_KEY = "retention_policy";
+  private static final String DEFAULT_RETENTION_POLICY = "default_retention_policy";
+  private static final String CUSTOM_RETENTION_POLICY = "custom_retention_policy";
+  private static final String CUSTOM_RETENTION_POLICY_GROUP = "custom_retention_policy_group";
+  private static final String CUSTOM_RETENTION_POLICY_NAME= "custom_retention_policy_name";
+  private static final String CUSTOM_RETENTION_POLICY_DURATION = "custom_retention_policy_duration";
 
   @Override
   public DataSinkDescription declareModel() {
@@ -52,6 +59,12 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
                     Labels.withId(TIMESTAMP_MAPPING_KEY),
                     PropertyScope.NONE).build())
             .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+            .requiredAlternatives(Labels.withId(RETENTION_POLICY_KEY),
+                                  Alternatives.from(Labels.withId(DEFAULT_RETENTION_POLICY)),
+                                  Alternatives.from(Labels.withId(CUSTOM_RETENTION_POLICY),
+                                      StaticProperties.group(Labels.withId(CUSTOM_RETENTION_POLICY_GROUP),
+                                      StaticProperties.stringFreeTextProperty(Labels.withId(CUSTOM_RETENTION_POLICY_NAME)),
+                                      StaticProperties.stringFreeTextProperty(Labels.withId(CUSTOM_RETENTION_POLICY_DURATION)))))
             .build();
   }
 
@@ -59,11 +72,19 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
   public ConfiguredEventSink<DataLakeParameters> onInvocation(DataSinkInvocation graph,
                                                               DataSinkParameterExtractor extractor) {
 
-
     String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
     measureName = DataLake.prepareString(measureName);
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
 
+    String selectedAlternative = extractor.selectedAlternativeInternalId(RETENTION_POLICY_KEY);
+    String custom_rp_name = "";
+    String custom_rp_duration = "";
+
+    if (selectedAlternative.equals(CUSTOM_RETENTION_POLICY)) {
+        custom_rp_name = extractor.singleValueParameter(CUSTOM_RETENTION_POLICY_NAME, String.class);
+        custom_rp_duration = extractor.singleValueParameter(CUSTOM_RETENTION_POLICY_DURATION, String.class);
+    }
+
     String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
     Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
     String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
@@ -84,8 +105,9 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
             timestampField,
             batch_size,
             flush_duration,
-            dimensionProperties);
-
+            dimensionProperties,
+            custom_rp_name,
+            custom_rp_duration);
 
     return new ConfiguredEventSink<>(params, DataLake::new);
   }
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index aacbf3e..1746089 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -18,8 +18,11 @@
 
 package org.apache.streampipes.sinks.internal.jvm.datalake;
 
+import org.apache.streampipes.config.backend.BackendConfig;
+import org.checkerframework.checker.units.qual.C;
 import org.influxdb.BatchOptions;
 import org.influxdb.InfluxDB;
+import org.influxdb.InfluxDBException;
 import org.influxdb.InfluxDBFactory;
 import org.influxdb.dto.Point;
 import org.influxdb.dto.Pong;
@@ -47,6 +50,8 @@ public class DataLakeInfluxDbClient {
     private Integer batchSize;
     private Integer flushDuration;
     private List<String> tagFields;
+    private String customRpName;
+    private String customRpDuration;
 
 
     private Logger logger;
@@ -63,7 +68,10 @@ public class DataLakeInfluxDbClient {
                          Integer batchSize,
                          Integer flushDuration,
                          List<String> tagsFields,
+                         String customRpName,
+                         String customRpDuration,
                          Logger logger) throws SpRuntimeException {
+
 		this.influxDbHost = influxDbHost;
 		this.influxDbPort = influxDbPort;
 		this.databaseName = databaseName;
@@ -73,6 +81,8 @@ public class DataLakeInfluxDbClient {
 		this.timestampField = timestampField;
 		this.batchSize = batchSize;
 		this.flushDuration = flushDuration;
+		this.customRpName = customRpName;
+		this.customRpDuration = customRpDuration;
 		this.logger = logger;
 		this.tagFields = tagsFields;
 
@@ -108,7 +118,7 @@ public class DataLakeInfluxDbClient {
    * be found
    */
 	private void connect() throws SpRuntimeException {
-	  // Connecting to the server
+    // Connecting to the server
     // "http://" must be in front
     String urlAndPort = influxDbHost + ":" + influxDbPort;
     influxDb = InfluxDBFactory.connect(urlAndPort, user, password);
@@ -125,9 +135,24 @@ public class DataLakeInfluxDbClient {
       createDatabase(databaseName);
     }
 
-    // setting up the database
+    // Setting up the database
     influxDb.setDatabase(databaseName);
     influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
+
+    // Optional: Set Retention Policy
+    if (customRpName != "" && customRpDuration != "") {
+
+        QueryResult result = influxDb.query(new Query("CREATE RETENTION POLICY "
+                                            + customRpName + " ON "
+                                            + BackendConfig.INSTANCE.getInfluxDatabaseName()
+                                            + " Duration " + customRpDuration
+                                            + " REPLICATION 1",
+                                            BackendConfig.INSTANCE.getInfluxDatabaseName()));
+
+        if (result.hasError() || result.getResults().get(0).getError() != null) {
+            throw new SpRuntimeException("Could not create the retention policy '" + customRpName + "': "  + result.getResults().get(0).getError());
+        }
+    }
 	}
 
   /**
@@ -190,8 +215,11 @@ public class DataLakeInfluxDbClient {
               }
           }
       }
-
-      influxDb.write(p.build());
+      if (customRpName != "" && customRpDuration != "") {
+          influxDb.write(BackendConfig.INSTANCE.getInfluxDatabaseName(), customRpName, p.build());
+      } else {
+          influxDb.write(p.build());
+      }
 	}
 
   /**
diff --git a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
index 4c6158d..0c33a93 100644
--- a/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
+++ b/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeParameters.java
@@ -38,6 +38,8 @@ public class DataLakeParameters extends EventSinkBindingParams {
   private Integer batchSize;
   private Integer flushDuration;
   private List<String> dimensionProperties;
+  private String customRpName;
+  private String customRpDuration;
 
   public DataLakeParameters(DataSinkInvocation graph,
                             String influxDbHost,
@@ -49,7 +51,9 @@ public class DataLakeParameters extends EventSinkBindingParams {
                             String timestampField,
                             Integer batchSize,
                             Integer flushDuration,
-                            List<String> dimensionProperties) {
+                            List<String> dimensionProperties,
+                            String customRpName,
+                            String customRpDuration) {
     super(graph);
 
     this.influxDbHost = influxDbHost;
@@ -62,6 +66,8 @@ public class DataLakeParameters extends EventSinkBindingParams {
     this.batchSize = batchSize;
     this.flushDuration = flushDuration;
     this.dimensionProperties = dimensionProperties;
+    this.customRpName = customRpName;
+    this.customRpDuration = customRpDuration;
   }
 
   public String getInfluxDbHost() {
@@ -103,4 +109,8 @@ public class DataLakeParameters extends EventSinkBindingParams {
   public List<String> getDimensionProperties() {
     return dimensionProperties;
   }
+
+  public String getCustomRpName() { return customRpName; }
+
+  public String getCustomRpDuration() { return customRpDuration; }
 }
diff --git a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
index 396d799..552f97f 100644
--- a/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
+++ b/streampipes-sinks-internal-jvm/src/main/resources/org.apache.streampipes.sinks.internal.jvm.datalake/strings.en
@@ -6,3 +6,22 @@ db_measurement.description=The name of the identifier under which the data is to
 
 timestamp_mapping.title=Timestamp Field
 timestamp_mapping.description=The value which contains a timestamp
+
+retention_policy.title=Retention Policy
+retention_policy.description=Specify the retention policy for the data
+
+default_retention_policy.title=Default Retention Policy
+default_retention_policy.description=
+
+custom_retention_policy.title=Custom Retention Policy
+custom_retention_policy.description=
+
+custom_retention_policy_group.title=Custom RP Group
+custom_retention_policy_group.description=
+
+custom_retention_policy_name.title=Retention Policy Name
+custom_retention_policy_name.description=
+
+custom_retention_policy_duration.title=Retention Policy Duration
+custom_retention_policy_duration.description=Specify the amount of time to retain the data (Example: 1h, 3d, 10w, ...)
+