You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/23 08:50:44 UTC

[incubator-streampipes] branch datalake-rest-extension updated: Added REST Endpoints

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
     new d9bfb9c  Added REST Endpoints
d9bfb9c is described below

commit d9bfb9c10fcb5bb751fcfcaf33a4406fb81e0109
Author: Felix John <jo...@axantu.com>
AuthorDate: Wed Sep 23 10:50:25 2020 +0200

    Added REST Endpoints
---
 .../rest/impl/datalake/DataLakeManagementV3.java   | 12 ++---
 .../rest/impl/datalake/DataLakeResourceV3.java     | 63 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index 84a64c1..e1004aa 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -246,11 +246,11 @@ public class DataLakeManagementV3 {
     String defaultPolicyString = "";
     if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
     String durationString = "";
-    if (duration != "")  {  durationString = " DURATION " + duration; }
+    if (duration != null)  {  durationString = " DURATION " + duration; }
     String replicationString = "";
     if (replication != 0) { replicationString = " REPLICATION " + replication; }
     String shardDurationString = "";
-    if (shardDuration != "")  { shardDuration = " SHARD DURATION " + shardDuration; }
+    if (shardDuration != null)  { shardDuration = " SHARD DURATION " + shardDuration; }
 
     Query query = new Query("CREATE RETENTION POLICY "
             + policyName
@@ -274,11 +274,11 @@ public class DataLakeManagementV3 {
     String defaultPolicyString = "";
     if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
     String durationString = "";
-    if (duration != "")  {  durationString = " DURATION " + duration; }
+    if (duration != null)  {  durationString = " DURATION " + duration; }
     String replicationString = "";
     if (replication != 0) { replicationString = " REPLICATION " + replication; }
     String shardDurationString = "";
-    if (shardDuration != "")  { shardDuration = " SHARD DURATION " + shardDuration; }
+    if (shardDuration != null)  { shardDuration = " SHARD DURATION " + shardDuration; }
 
     Query query = new Query("ALTER RETENTION POLICY "
             + policyName
@@ -288,7 +288,7 @@ public class DataLakeManagementV3 {
             + shardDurationString
             + defaultPolicyString,
             BackendConfig.INSTANCE.getInfluxDatabaseName());
-
+    
     QueryResult influx_result = influxDB.query(query);
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
       System.out.println("Error!");
@@ -311,7 +311,7 @@ public class DataLakeManagementV3 {
   public static void main(String [] args) {
     DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
     DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
-    dlmv3.createRetentionPolicy("rp6", "1h", "", 5, Boolean.FALSE);
+    dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
     // dlmv3.deleteRetentionPolicy("rp5");
   }
 
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index 1ae82ec..6a92cd6 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import javax.ws.rs.*;
 import javax.ws.rs.core.*;
 import java.io.IOException;
+import java.rmi.server.ExportException;
 import java.text.ParseException;
 import java.util.List;
 
@@ -252,4 +253,66 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
         return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
   }
+
+  @GET
+  @Path("/policy/info")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getRetentionPolicies() {
+    DataResult result = dataLakeManagement.getRetentionPoliciesOfDatabase();
+    return Response.ok(result).build();
+  }
+
+  @GET
+  @Path("/policy/{name}/delete")
+  @Produces(MediaType.TEXT_PLAIN)
+  public Response deleteRetentionPolicy(@PathParam("name") String policyName) {
+    dataLakeManagement.deleteRetentionPolicy(policyName);
+    return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
+  }
+
+  @GET
+  @Path("/policy/{name}/create")
+  @Produces(MediaType.TEXT_PLAIN)
+  public Response createRetentionPolicy(@PathParam("name") String policyName,
+                                        @Context UriInfo info) {
+
+    String duration = info.getQueryParameters().getFirst("duration");
+    String replicationString  = info.getQueryParameters().getFirst("replication");
+    String shardDuration = info.getQueryParameters().getFirst("shardDuration");
+    String defaultPolicyString = info.getQueryParameters().getFirst("defaultPolicy");
+
+    Integer replication = 0;
+    Boolean defaultPolicy = Boolean.FALSE;
+
+    if (replicationString != null) { replication = Integer.parseInt(replicationString); }
+    if (defaultPolicyString != null) { defaultPolicy = Boolean.TRUE; }
+
+    dataLakeManagement.createRetentionPolicy(policyName, duration, shardDuration, replication, defaultPolicy);
+    return Response.ok("Successfully created retention policy.", MediaType.TEXT_PLAIN).build();
+  }
+
+
+  @GET
+  @Path("/policy/{name}/alter")
+  @Produces(MediaType.TEXT_PLAIN)
+  public Response alterRetentionPolicy(@PathParam("name") String policyName,
+                                        @Context UriInfo info) {
+
+    String duration = info.getQueryParameters().getFirst("duration");
+    String replicationString  = info.getQueryParameters().getFirst("replication");
+    String shardDuration = info.getQueryParameters().getFirst("shardDuration");
+    String defaultPolicyString = info.getQueryParameters().getFirst("defaultPolicy");
+
+    Integer replication = 0;
+    Boolean defaultPolicy = Boolean.FALSE;
+
+    if (replicationString != null) { replication = Integer.parseInt(replicationString); }
+    if (defaultPolicyString != null) { defaultPolicy = Boolean.TRUE; }
+
+    dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication, defaultPolicy);
+    return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
+  }
+
+
+
 }