You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/24 14:19:37 UTC

[incubator-streampipes] branch datalake-rest-extension updated: Clean Up & Minor Fixes

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
     new 261133d  Clean Up & Minor Fixes
261133d is described below

commit 261133d5152d907bb0e4baffcf47642c7c9f2c68
Author: Felix John <jo...@axantu.com>
AuthorDate: Thu Sep 24 16:19:24 2020 +0200

    Clean Up & Minor Fixes
---
 .../rest/impl/datalake/DataLakeManagementV3.java   | 30 ++++++++++++++--------
 .../rest/impl/datalake/DataLakeResourceV3.java     | 21 ++++++++++-----
 2 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index e192276..caaa401 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -240,7 +240,7 @@ public class DataLakeManagementV3 {
   }
 
 
-  public void createRetentionPolicy(String policyName, String duration, String shardDuration, int replication) {
+  public boolean createRetentionPolicy(String rp, String duration, String shardDuration, int replication) {
     InfluxDB influxDB = getInfluxDBClient();
 
     String durationString = "";
@@ -251,7 +251,7 @@ public class DataLakeManagementV3 {
     if (shardDuration != null)  { shardDurationString = " SHARD DURATION " + shardDuration; }
 
     Query query = new Query("CREATE RETENTION POLICY "
-            + policyName
+            + rp
             + " ON "
             + BackendConfig.INSTANCE.getInfluxDatabaseName()
             + durationString
@@ -261,13 +261,15 @@ public class DataLakeManagementV3 {
 
     QueryResult influx_result = influxDB.query(query);
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
-      System.out.println("Error!");
+      influxDB.close();
+      return false;
     }
     influxDB.close();
+    return true;
   }
 
 
-  public void alterRetentionPolicy(String policyName, String duration, String shardDuration, int replication) {
+  public boolean alterRetentionPolicy(String rp, String duration, String shardDuration, int replication) {
     InfluxDB influxDB = getInfluxDBClient();
 
     String durationString = "";
@@ -278,7 +280,7 @@ public class DataLakeManagementV3 {
     if (shardDuration != null)  { shardDurationString = " SHARD DURATION " + shardDuration; }
 
     Query query = new Query("ALTER RETENTION POLICY "
-            + policyName
+            + rp
             + " ON "
             + BackendConfig.INSTANCE.getInfluxDatabaseName()
             + durationString
@@ -288,24 +290,29 @@ public class DataLakeManagementV3 {
 
     QueryResult influx_result = influxDB.query(query);
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
-      System.out.println("Error!");
+        influxDB.close();
+        return false;
+
     }
     influxDB.close();
+    return true;
   }
 
-  public void deleteRetentionPolicy(String policyName) {
+  public boolean deleteRetentionPolicy(String rp) {
     InfluxDB influxDB = getInfluxDBClient();
     Query query = new Query("DROP RETENTION POLICY "
-            + policyName
+            + rp
             + " ON "
             + BackendConfig.INSTANCE.getInfluxDatabaseName(),
             BackendConfig.INSTANCE.getInfluxDatabaseName());
 
     QueryResult influx_result = influxDB.query(query);
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
-      System.out.println("Error!");
+        influxDB.close();
+        return false;
     }
     influxDB.close();
+    return true;
   }
 
   public double getNumOfRecordsOfTable(String index) {
@@ -344,10 +351,11 @@ public class DataLakeManagementV3 {
     DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
     InfluxDB influxDB = dlmv3.getInfluxDBClient();
     long size = dlmv3.getStorageSizeOfDatabase();
-    dlmv3.deleteRetentionPolicy("rp10");
+    System.out.println(size);
+    // dlmv3.deleteRetentionPolicy("rp10");
     // DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
     // dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
-    // dlmv3.getNumOfRecordsOfTable("john3800");
+    System.out.println(dlmv3.getNumOfRecordsOfTable("felix4", null));
   }
 
   public void deleteMeasurement(String index) {
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index 3cb678c..d7157d7 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -282,7 +282,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @Produces(MediaType.TEXT_PLAIN)
   public Response deleteRetentionPolicy(@PathParam("name") String policyName) {
     dataLakeManagement.deleteRetentionPolicy(policyName);
-    return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
+    if (dataLakeManagement.deleteRetentionPolicy(policyName)) {
+      return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
+    } else {
+      return Response.serverError().build();
+    }
   }
 
   @GET
@@ -298,9 +302,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
     Integer replication = 0;
 
     if (replicationString != null) { replication = Integer.parseInt(replicationString); }
-    dataLakeManagement.createRetentionPolicy(policyName, duration, shardDuration, replication);
-
-    return Response.ok("Successfully created retention policy.", MediaType.TEXT_PLAIN).build();
+    if (dataLakeManagement.createRetentionPolicy(policyName, duration, shardDuration, replication)) {
+      return Response.ok("Successfully created retention policy.", MediaType.TEXT_PLAIN).build();
+    } else {
+      return Response.serverError().build();
+    }
   }
 
 
@@ -318,7 +324,10 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
     if (replicationString != null) { replication = Integer.parseInt(replicationString); }
 
-    dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication);
-    return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
+    if (dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication)) {
+      return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
+    } else {
+      return Response.serverError().build();
+    }
   }
 }