You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/23 08:50:44 UTC
[incubator-streampipes] branch datalake-rest-extension updated:
Added REST Endpoints
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
new d9bfb9c Added REST Endpoints
d9bfb9c is described below
commit d9bfb9c10fcb5bb751fcfcaf33a4406fb81e0109
Author: Felix John <jo...@axantu.com>
AuthorDate: Wed Sep 23 10:50:25 2020 +0200
Added REST Endpoints
---
.../rest/impl/datalake/DataLakeManagementV3.java | 12 ++---
.../rest/impl/datalake/DataLakeResourceV3.java | 63 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 6 deletions(-)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index 84a64c1..e1004aa 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -246,11 +246,11 @@ public class DataLakeManagementV3 {
String defaultPolicyString = "";
if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
String durationString = "";
- if (duration != "") { durationString = " DURATION " + duration; }
+ if (duration != null) { durationString = " DURATION " + duration; }
String replicationString = "";
if (replication != 0) { replicationString = " REPLICATION " + replication; }
String shardDurationString = "";
- if (shardDuration != "") { shardDuration = " SHARD DURATION " + shardDuration; }
+ if (shardDuration != null) { shardDuration = " SHARD DURATION " + shardDuration; }
Query query = new Query("CREATE RETENTION POLICY "
+ policyName
@@ -274,11 +274,11 @@ public class DataLakeManagementV3 {
String defaultPolicyString = "";
if (defaultPolicy) { defaultPolicyString = " DEFAULT"; }
String durationString = "";
- if (duration != "") { durationString = " DURATION " + duration; }
+ if (duration != null) { durationString = " DURATION " + duration; }
String replicationString = "";
if (replication != 0) { replicationString = " REPLICATION " + replication; }
String shardDurationString = "";
- if (shardDuration != "") { shardDuration = " SHARD DURATION " + shardDuration; }
+ if (shardDuration != null) { shardDuration = " SHARD DURATION " + shardDuration; }
Query query = new Query("ALTER RETENTION POLICY "
+ policyName
@@ -288,7 +288,7 @@ public class DataLakeManagementV3 {
+ shardDurationString
+ defaultPolicyString,
BackendConfig.INSTANCE.getInfluxDatabaseName());
-
+
QueryResult influx_result = influxDB.query(query);
if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
System.out.println("Error!");
@@ -311,7 +311,7 @@ public class DataLakeManagementV3 {
public static void main(String [] args) {
DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
- dlmv3.createRetentionPolicy("rp6", "1h", "", 5, Boolean.FALSE);
+ dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
// dlmv3.deleteRetentionPolicy("rp5");
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index 1ae82ec..6a92cd6 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -30,6 +30,7 @@ import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
import javax.ws.rs.*;
import javax.ws.rs.core.*;
import java.io.IOException;
+import java.rmi.server.ExportException;
import java.text.ParseException;
import java.util.List;
@@ -252,4 +253,66 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
}
+
+ @GET
+ @Path("/policy/info")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRetentionPolicies() {
+ DataResult result = dataLakeManagement.getRetentionPoliciesOfDatabase();
+ return Response.ok(result).build();
+ }
+
+ @GET
+ @Path("/policy/{name}/delete")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response deleteRetentionPolicy(@PathParam("name") String policyName) {
+ dataLakeManagement.deleteRetentionPolicy(policyName);
+ return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
+ }
+
+ @GET
+ @Path("/policy/{name}/create")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response createRetentionPolicy(@PathParam("name") String policyName,
+ @Context UriInfo info) {
+
+ String duration = info.getQueryParameters().getFirst("duration");
+ String replicationString = info.getQueryParameters().getFirst("replication");
+ String shardDuration = info.getQueryParameters().getFirst("shardDuration");
+ String defaultPolicyString = info.getQueryParameters().getFirst("defaultPolicy");
+
+ Integer replication = 0;
+ Boolean defaultPolicy = Boolean.FALSE;
+
+ if (replicationString != null) { replication = Integer.parseInt(replicationString); }
+ if (defaultPolicyString != null) { defaultPolicy = Boolean.TRUE; }
+
+ dataLakeManagement.createRetentionPolicy(policyName, duration, shardDuration, replication, defaultPolicy);
+ return Response.ok("Successfully created retention policy.", MediaType.TEXT_PLAIN).build();
+ }
+
+
+ @GET
+ @Path("/policy/{name}/alter")
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response alterRetentionPolicy(@PathParam("name") String policyName,
+ @Context UriInfo info) {
+
+ String duration = info.getQueryParameters().getFirst("duration");
+ String replicationString = info.getQueryParameters().getFirst("replication");
+ String shardDuration = info.getQueryParameters().getFirst("shardDuration");
+ String defaultPolicyString = info.getQueryParameters().getFirst("defaultPolicy");
+
+ Integer replication = 0;
+ Boolean defaultPolicy = Boolean.FALSE;
+
+ if (replicationString != null) { replication = Integer.parseInt(replicationString); }
+ if (defaultPolicyString != null) { defaultPolicy = Boolean.TRUE; }
+
+ dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication, defaultPolicy);
+ return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
+ }
+
+
+
}