You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/23 13:41:59 UTC

[incubator-streampipes] branch datalake-rest-extension updated: Get StorageSize of Entire Influx & Added close()

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
     new 9750239  Get StorageSize of Entire Influx & Added close()
9750239 is described below

commit 9750239e95be5601b9a037cb5e43374f4d7dba2f
Author: Felix John <jo...@axantu.com>
AuthorDate: Wed Sep 23 15:41:44 2020 +0200

    Get StorageSize of Entire Influx & Added close()
---
 .../rest/impl/datalake/DataLakeManagementV3.java   | 19 +++++++++++++++-
 .../rest/impl/datalake/DataLakeResourceV3.java     | 25 ++++++++++++++--------
 2 files changed, 34 insertions(+), 10 deletions(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index 461fab7..fad123a 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -265,6 +265,7 @@ public class DataLakeManagementV3 {
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
       System.out.println("Error!");
     }
+    influxDB.close();
   }
 
 
@@ -293,6 +294,7 @@ public class DataLakeManagementV3 {
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
       System.out.println("Error!");
     }
+    influxDB.close();
   }
 
   public void deleteRetentionPolicy(String policyName) {
@@ -306,6 +308,7 @@ public class DataLakeManagementV3 {
     if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
       System.out.println("Error!");
     }
+    influxDB.close();
   }
 
   public double getNumOfRecordsOfTable(String index) {
@@ -323,13 +326,27 @@ public class DataLakeManagementV3 {
         numOfRecords = Double.parseDouble(item.toString());
       }
     }
-
+    influxDB.close();
     return numOfRecords;
   }
 
+  public long getStorageSizeOfDatabase() {
+    InfluxDB influxDB = getInfluxDBClient();
+    Query query = new Query("SHOW STATS for 'shard'");
+    QueryResult influx_result = influxDB.query(query);
+
+    long storageSize = 0;
+    for (QueryResult.Series series : influx_result.getResults().get(0).getSeries()) {
+      storageSize = storageSize + (long) Double.parseDouble(series.getValues().get(0).get(0).toString());
+    }
+    influxDB.close();
+    return storageSize;
+  }
+
   public static void main(String [] args) {
     DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
     InfluxDB influxDB = dlmv3.getInfluxDBClient();
+    long size = dlmv3.getStorageSizeOfDatabase();
     // DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
     // dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
     // dlmv3.getNumOfRecordsOfTable("john3800");
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index a848b49..ccbb8e7 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -30,7 +30,6 @@ import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
 import javax.ws.rs.*;
 import javax.ws.rs.core.*;
 import java.io.IOException;
-import java.rmi.server.ExportException;
 import java.text.ParseException;
 import java.util.List;
 
@@ -250,16 +249,24 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @POST
   @Produces(MediaType.TEXT_PLAIN)
   @Path("/data/{index}/{startdate}/{enddate}/labeling/{column}")
-    public Response labelData(@Context UriInfo info,
-                              @PathParam("index") String index,
-                              @PathParam("startdate") long startdate,
-                              @PathParam("enddate") long enddate,
-                              @PathParam("column") String column) {
+  public Response labelData(@Context UriInfo info,
+                            @PathParam("index") String index,
+                            @PathParam("startdate") long startdate,
+                            @PathParam("enddate") long enddate,
+                            @PathParam("column") String column) {
 
-        String label = info.getQueryParameters().getFirst("label");
-        this.dataLakeManagement.updateLabels(index, column, startdate, enddate, label);
+      String label = info.getQueryParameters().getFirst("label");
+      this.dataLakeManagement.updateLabels(index, column, startdate, enddate, label);
 
-        return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
+      return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
+  }
+
+  @GET
+  @Produces(MediaType.TEXT_PLAIN)
+  @Path("/database/size")
+  public Response getStorageSizeOfDatabase() {
+    Long storageSize = dataLakeManagement.getStorageSizeOfDatabase();
+    return Response.ok(storageSize.toString(), MediaType.TEXT_PLAIN).build();
   }
 
   @GET