You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/23 09:26:25 UTC
[incubator-streampipes] branch datalake-rest-extension updated: New
Endpoint: Number of Records in Measurement
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
new 4094171 New Endpoint: Number of Records in Measurement
4094171 is described below
commit 409417112f0e19d689e74f20e78ae798dd6a83c1
Author: Felix John <jo...@axantu.com>
AuthorDate: Wed Sep 23 11:26:11 2020 +0200
New Endpoint: Number of Records in Measurement
---
.../rest/impl/datalake/DataLakeManagementV3.java | 46 +++++++++++-----------
.../rest/impl/datalake/DataLakeResourceV3.java | 11 ++++--
2 files changed, 32 insertions(+), 25 deletions(-)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index e1004aa..461fab7 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -288,7 +288,7 @@ public class DataLakeManagementV3 {
+ shardDurationString
+ defaultPolicyString,
BackendConfig.INSTANCE.getInfluxDatabaseName());
-
+
QueryResult influx_result = influxDB.query(query);
if (influx_result.hasError() || influx_result.getResults().get(0).getError() != null) {
System.out.println("Error!");
@@ -308,11 +308,31 @@ public class DataLakeManagementV3 {
}
}
+ public double getNumOfRecordsOfTable(String index) {
+ InfluxDB influxDB = getInfluxDBClient();
+ double numOfRecords = 0;
+
+ QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index,
+ BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
+ if (result.getSeries() == null) {
+ return numOfRecords;
+ }
+
+ for (Object item : result.getSeries().get(0).getValues().get(0)) {
+ if (item instanceof Double && numOfRecords < Double.parseDouble(item.toString())) {
+ numOfRecords = Double.parseDouble(item.toString());
+ }
+ }
+
+ return numOfRecords;
+ }
+
public static void main(String [] args) {
DataLakeManagementV3 dlmv3 = new DataLakeManagementV3();
- DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
- dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
- // dlmv3.deleteRetentionPolicy("rp5");
+ InfluxDB influxDB = dlmv3.getInfluxDBClient();
+ // DataResult result = dlmv3.getRetentionPoliciesOfDatabase();
+ // dlmv3.createRetentionPolicy("rp8", "1h", null, 5, Boolean.FALSE);
+ // dlmv3.getNumOfRecordsOfTable("john3800");
}
public void deleteMeasurement(String index) {
@@ -603,24 +623,6 @@ public class DataLakeManagementV3 {
return date.getTime();
}
- private double getNumOfRecordsOfTable(String index, InfluxDB influxDB) {
- double numOfRecords = 0;
-
- QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index,
- BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
- if (result.getSeries() == null) {
- return numOfRecords;
- }
-
- for (Object item : result.getSeries().get(0).getValues().get(0)) {
- if (item instanceof Double && numOfRecords < Double.parseDouble(item.toString())) {
- numOfRecords = Double.parseDouble(item.toString());
- }
- }
-
- return numOfRecords;
- }
-
private double getNumOfRecordsOfTable(String index, InfluxDB influxDB, long startDate, long endDate) {
double numOfRecords = 0;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index 6a92cd6..a848b49 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -220,6 +220,14 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
}
@GET
+ @Produces(MediaType.TEXT_PLAIN)
+ @Path("/data/{index}/count")
+ public Response getNumOfRecordsOfTable(@PathParam("index") String index) {
+ Double numOfRecords = dataLakeManagement.getNumOfRecordsOfTable(index);
+ return Response.ok(numOfRecords, MediaType.TEXT_PLAIN).build();
+ }
+
+ @GET
@Path("/data/image/{route}/file")
@Produces("image/png")
public Response getImage(@PathParam("route") String fileRoute) throws IOException {
@@ -312,7 +320,4 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
dataLakeManagement.alterRetentionPolicy(policyName, duration, shardDuration, replication, defaultPolicy);
return Response.ok("Successfully altered retention policy.", MediaType.TEXT_PLAIN).build();
}
-
-
-
}