You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/24 14:19:37 UTC
[incubator-streampipes] branch datalake-rest-extension updated:
Clean Up & Minor Fixes
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
new 261133d Clean Up & Minor Fixes
261133d is described below
commit 261133d5152d907bb0e4baffcf47642c7c9f2c68
Author: Felix John <jo...@axantu.com>
AuthorDate: Thu Sep 24 16:19:24 2020 +0200
Clean Up & Minor Fixes
---
.../rest/impl/datalake/DataLakeManagementV3.java | 30 ++++++++++++++--------
.../rest/impl/datalake/DataLakeResourceV3.java | 21 ++++++++++-----
2 files changed, 34 insertions(+), 17 deletions(-)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index e192276..caaa401 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -240,7 +240,7 @@ public class DataLakeManagementV3 {
}
- public void createRetentionPolicy(String policyName, String duration, String shardDuration, int replication) {
+ public boolean createRetentionPolicy(String rp, String duration, String shardDuration, int replication) {
InfluxDB influxDB = getInfluxDBClient();
String durationString = "";
@@ -251,7 +251,7 @@ public class DataLakeManagementV3 {
if (shardDuration != null) { shardDurationString = " SHARD DURATION " + shardDuration; }
Query query = new Query("CREATE RETENTION POLICY "
- + policyName
+ + rp
+ " ON "
+ BackendConfig.INSTANCE.getInfluxDatabaseName()
+ durationString
@@ -261,13 +261,15 @@ public class DataLakeManagementV3 {
QueryResult influx_result = influxDB.query(query);
if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
- System.out.println("Error!");
+ influxDB.close();
+ return false;
}
influxDB.close();
+ return true;
}
- public void alterRetentionPolicy(String policyName, String duration, String shardDuration, int replication) {
+ public boolean alterRetentionPolicy(String rp, String duration, String shardDuration, int replication) {
InfluxDB influxDB = getInfluxDBClient();
String durationString = "";
@@ -278,7 +280,7 @@ public class DataLakeManagementV3 {
if (shardDuration != null) { shardDurationString = " SHARD DURATION " + shardDuration; }
Query query = new Query("ALTER RETENTION POLICY "
- + policyName
+ + rp
+ " ON "
+ BackendConfig.INSTANCE.getInfluxDatabaseName()
+ durationString
@@ -288,24 +290,29 @@ public class DataLakeManagementV3 {
QueryResult influx_result = influxDB.query(query);
if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
- System.out.println("Error!");
+ influxDB.close();
+ return false;
+
}
influxDB.close();
+ return true;
}
- public void deleteRetentionPolicy(String policyName) {
+ public boolean deleteRetentionPolicy(String rp) {
InfluxDB influxDB = getInfluxDBClient();
Query query = new Query("DROP RETENTION POLICY "
- + policyName
+ + rp
+ " ON "
+ BackendConfig.INSTANCE.getInfluxDatabaseName(),
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult influx_result = influxDB.query(query);
if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
- System.out.println("Error!");
+ influxDB.close();
+ return false;
}
influxDB.close();
+ return true;
}
public double getNumOfRecordsOfTable(String index) {
@@ -344,10 +351,11 @@ public class DataLakeManagementV3 {
DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
InfluxDB influxDB = dlmv3.getInfluxDBClient();
long size = dlmv3.getStorageSizeOfDatabase();
- dlmv3.deleteRetentionPolicy("rp10");
+ System.out.println(size);
+ // dlmv3.deleteRetentionPolicy("rp10");
// DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
// dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
- // dlmv3.getNumOfRecordsOfTable("john3800");
+ System.out.println(dlmv3.getNumOfRecordsOfTable("felix4", null));
}
public void deleteMeasurement(String index) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index 3cb678c..d7157d7 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -282,7 +282,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@Produces(MediaType.TEXT_PLAIN)
public Response deleteRetentionPolicy(@PathParam("name") String policyName) {
dataLakeManagement.deleteRetentionPolicy(policyName);
- return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
+ if (dataLakeManagement.deleteRetentionPolicy(policyName)) {
+ return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
+ } else {
+ return Response.serverError().build();
+ }
}
@GET
@@ -298,9 +302,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
Integer replication = 0;
if (replicationString != null) { replication = Integer.parseInt(replicationString); }
- dataLakeManagement.createRetentionPolicy(policyName, duration, shardDuration, replication);
-
- return Response.ok("Successfully created retention policy.", MediaType.TEXT_PLAIN).build();
+ if (dataLakeManagement.createRetentionPolicy(policyName, duration, shardDuration, replication)) {
+ return Response.ok("Successfully created retention policy.", MediaType.TEXT_PLAIN).build();
+ } else {
+ return Response.serverError().build();
+ }
}
@@ -318,7 +324,10 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
if (replicationString != null) { replication = Integer.parseInt(replicationString); }
- dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication);
- return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
+ if (dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication)) {
+ return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
+ } else {
+ return Response.serverError().build();
+ }
}
}