You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/23 05:57:46 UTC

[incubator-streampipes] branch datalake-rest-extension updated: Update Create/Change RetentionPolicy

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
     new 0e65dd4  Update Create/Change RetentionPolicy
0e65dd4 is described below

commit 0e65dd49e646355d96d7fd763d9a9d63220215a3
Author: Felix John <jo...@axantu.com>
AuthorDate: Wed Sep 23 07:55:26 2020 +0200

    Update Create/Change RetentionPolicy
---
 .../rest/impl/datalake/DataLakeManagementV3.java   | 66 ++++++++++++++++++----
 1 file changed, 56 insertions(+), 10 deletions(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index f58b638..84a64c1 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -229,6 +229,7 @@ public class DataLakeManagementV3 {
     return new PageResult(dataResult.getTotal(), dataResult.getHeaders(), dataResult.getRows(), page, pageSum);
   }
 
+
   public DataResult getRetentionPoliciesOfDatabase() {
     InfluxDB influxDB = getInfluxDBClient();
     Query query = new Query("SHOW RETENTION POLICIES ON sp");
@@ -239,21 +240,52 @@ public class DataLakeManagementV3 {
   }
 
 
+  public void createRetentionPolicy(String policyName, String duration, String shardDuration, int replication, boolean defaultPolicy) {
+    InfluxDB influxDB = getInfluxDBClient();
+
+    String defaultPolicyString = "";
+    if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
+    String durationString = "";
+    if (duration != "")  {  durationString = " DURATION " + duration; }
+    String replicationString = "";
+    if (replication != 0) { replicationString = " REPLICATION " + replication; }
+    String shardDurationString = "";
+    if (shardDuration != "")  { shardDuration = " SHARD DURATION " + shardDuration; }
+
+    Query query = new Query("CREATE RETENTION POLICY "
+            + policyName
+            + " ON sp "
+            + durationString
+            + replicationString
+            + shardDurationString
+            + defaultPolicyString,
+            BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+    QueryResult influx_result = influxDB.query(query);
+    if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
+      System.out.println("Error!");
+    }
+  }
+
+
   public void alterRetentionPolicy(String policyName, String duration, String shardDuration, int replication, boolean defaultPolicy) {
     InfluxDB influxDB = getInfluxDBClient();
 
     String defaultPolicyString = "";
-    if (defaultPolicy)
-      defaultPolicyString = " DEFAULT";
+    if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
+    String durationString = "";
+    if (duration != "")  {  durationString = " DURATION " + duration; }
+    String replicationString = "";
+    if (replication != 0) { replicationString = " REPLICATION " + replication; }
+    String shardDurationString = "";
+    if (shardDuration != "")  { shardDuration = " SHARD DURATION " + shardDuration; }
 
     Query query = new Query("ALTER RETENTION POLICY "
             + policyName
-            + " ON sp DURATION "
-            + duration
-            + " REPLICATION "
-            + replication
-            + " SHARD DURATION "
-            + shardDuration
+            + " ON sp "
+            + durationString
+            + replicationString
+            + shardDurationString
             + defaultPolicyString,
             BackendConfig.INSTANCE.getInfluxDatabaseName());
 
@@ -262,11 +294,25 @@ public class DataLakeManagementV3 {
       System.out.println("Error!");
     }
   }
-  
+
+  public void deleteRetentionPolicy(String policyName) {
+    InfluxDB influxDB = getInfluxDBClient();
+    Query query = new Query("DROP RETENTION POLICY "
+            + policyName
+            + " ON sp ",
+            BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+    QueryResult influx_result = influxDB.query(query);
+    if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
+      System.out.println("Error!");
+    }
+  }
+
   public static void main(String [] args) {
     DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
     DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
-    dlmv3.alterRetentionPolicy("rp2", "1h", "1h", 3, Boolean.FALSE);
+    dlmv3.createRetentionPolicy("rp6", "1h", "", 5, Boolean.FALSE);
+    // dlmv3.deleteRetentionPolicy("rp5");
   }
 
   public void deleteMeasurement(String index) {