You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/23 05:57:46 UTC
[incubator-streampipes] branch datalake-rest-extension updated:
Update Create/Change RetentionPolicy
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
new 0e65dd4 Update Create/Change RetentionPolicy
0e65dd4 is described below
commit 0e65dd49e646355d96d7fd763d9a9d63220215a3
Author: Felix John <jo...@axantu.com>
AuthorDate: Wed Sep 23 07:55:26 2020 +0200
Update Create/Change RetentionPolicy
---
.../rest/impl/datalake/DataLakeManagementV3.java | 66 ++++++++++++++++++----
1 file changed, 56 insertions(+), 10 deletions(-)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index f58b638..84a64c1 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -229,6 +229,7 @@ public class DataLakeManagementV3 {
return new PageResult(dataResult.getTotal(), dataResult.getHeaders(), dataResult.getRows(), page, pageSum);
}
+
public DataResult getRetentionPoliciesOfDatabase() {
InfluxDB influxDB = getInfluxDBClient();
Query query = new Query("SHOW RETENTION POLICIES ON sp");
@@ -239,21 +240,52 @@ public class DataLakeManagementV3 {
}
+ public void createRetentionPolicy(String policyName, String duration, String shardDuration, int replication, boolean defaultPolicy) {
+ InfluxDB influxDB = getInfluxDBClient();
+
+ String defaultPolicyString = "";
+ if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
+ String durationString = "";
+ if (duration != "") { durationString = " DURATION " + duration; }
+ String replicationString = "";
+ if (replication != 0) { replicationString = " REPLICATION " + replication; }
+ String shardDurationString = "";
+ if (shardDuration != "") { shardDuration = " SHARD DURATION " + shardDuration; }
+
+ Query query = new Query("CREATE RETENTION POLICY "
+ + policyName
+ + " ON sp "
+ + durationString
+ + replicationString
+ + shardDurationString
+ + defaultPolicyString,
+ BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+ QueryResult influx_result = influxDB.query(query);
+ if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
+ System.out.println("Error!");
+ }
+ }
+
+
public void alterRetentionPolicy(String policyName, String duration, String shardDuration, int replication, boolean defaultPolicy) {
InfluxDB influxDB = getInfluxDBClient();
String defaultPolicyString = "";
- if (defaultPolicy)
- defaultPolicyString = " DEFAULT";
+ if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
+ String durationString = "";
+ if (duration != "") { durationString = " DURATION " + duration; }
+ String replicationString = "";
+ if (replication != 0) { replicationString = " REPLICATION " + replication; }
+ String shardDurationString = "";
+ if (shardDuration != "") { shardDuration = " SHARD DURATION " + shardDuration; }
Query query = new Query("ALTER RETENTION POLICY "
+ policyName
- + " ON sp DURATION "
- + duration
- + " REPLICATION "
- + replication
- + " SHARD DURATION "
- + shardDuration
+ + " ON sp "
+ + durationString
+ + replicationString
+ + shardDurationString
+ defaultPolicyString,
BackendConfig.INSTANCE.getInfluxDatabaseName());
@@ -262,11 +294,25 @@ public class DataLakeManagementV3 {
System.out.println("Error!");
}
}
-
+
+ public void deleteRetentionPolicy(String policyName) {
+ InfluxDB influxDB = getInfluxDBClient();
+ Query query = new Query("DROP RETENTION POLICY "
+ + policyName
+ + " ON sp ",
+ BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+ QueryResult influx_result = influxDB.query(query);
+ if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
+ System.out.println("Error!");
+ }
+ }
+
public static void main(String [] args) {
DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
- dlmv3.alterRetentionPolicy("rp2", "1h", "1h", 3, Boolean.FALSE);
+ dlmv3.createRetentionPolicy("rp6", "1h", "", 5, Boolean.FALSE);
+ // dlmv3.deleteRetentionPolicy("rp5");
}
public void deleteMeasurement(String index) {