You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/25 06:37:06 UTC
[incubator-streampipes] branch datalake-rest-extension updated:
Chane to InfluxDB Query with Retention Policy
This is an automated email from the ASF dual-hosted git repository.
fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
new 64c1655 Chane to InfluxDB Query with Retention Policy
64c1655 is described below
commit 64c16554bdbe5e66926f6f0155233e7be52eb269
Author: Felix John <jo...@axantu.com>
AuthorDate: Fri Sep 25 08:36:55 2020 +0200
Chane to InfluxDB Query with Retention Policy
---
.../streampipes/config/backend/BackendConfig.java | 5 +
.../config/backend/BackendConfigKeys.java | 1 +
.../rest/impl/datalake/DataLakeManagementV3.java | 214 +++++++++++++--------
.../rest/impl/datalake/DataLakeResourceV3.java | 53 +++--
4 files changed, 171 insertions(+), 102 deletions(-)
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index b108ccc..f944650 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -63,6 +63,7 @@ public enum BackendConfig {
config.register(BackendConfigKeys.INFLUX_HOST, "influxdb", "The host of the influx data base");
config.register(BackendConfigKeys.INFLUX_PORT, 8086, "The hist of the influx data base");
config.register(BackendConfigKeys.INFLUX_DATA_BASE, "sp", "The influx data base name");
+ config.register(BackendConfigKeys.INFLUX_DEFAULT_RETENTION_POLICY, "autogen", "The influx default retention policy name");
config.registerObject(BackendConfigKeys.MESSAGING_SETTINGS, MessagingSettings.fromDefault(),
"Default Messaging Settings");
@@ -224,6 +225,10 @@ public enum BackendConfig {
return config.getString(BackendConfigKeys.INFLUX_DATA_BASE);
}
+ public String getDefaultRetentionPolicyName() {
+ return config.getString(BackendConfigKeys.INFLUX_DEFAULT_RETENTION_POLICY);
+ }
+
public String getEncryptionKey() {
return config.getString(BackendConfigKeys.ENCRYPTION_KEY);
}
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index aaa4755..c66e9df 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -41,6 +41,7 @@ public class BackendConfigKeys {
public static final String INFLUX_PORT = "SP_INFLUX_PORT";
public static final String INFLUX_HOST = "SP_INFLUX_HOST";
public static final String INFLUX_DATA_BASE = "SP_INFLUX_DATA_BASE";
+ public static final String INFLUX_DEFAULT_RETENTION_POLICY = "SP_INFLUX_RETENTION_POLICY";
public static final String MESSAGING_SETTINGS = "SP_MESSAGING_SETTINGS";
public static final String ENCRYPTION_KEY = "SP_ENCRYPTION_KEY";
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index caaa401..4f6b860 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -47,6 +47,7 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import javax.validation.constraints.Null;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
@@ -64,9 +65,14 @@ public class DataLakeManagementV3 {
return indices;
}
- public DataResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue) {
+ public DataResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
- Query query = new Query("SELECT mean(*) FROM " + index + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SELECT mean(*) FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+ " GROUP BY time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult result = influxDB.query(query);
@@ -79,9 +85,14 @@ public class DataLakeManagementV3 {
public GroupedDataResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue,
- String groupingTag) {
+ String groupingTag, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
- Query query = new Query("SELECT mean(*), count(*) FROM " + index + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SELECT mean(*), count(*) FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+ " GROUP BY " + groupingTag + ",time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult result = influxDB.query(query);
@@ -92,9 +103,13 @@ public class DataLakeManagementV3 {
return groupedDataResult;
}
- public DataResult getEvents(String index, long startDate, long endDate) {
+ public DataResult getEvents(String index, long startDate, long endDate, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
- Query query = new Query("SELECT * FROM " + index
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SELECT * FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+ " ORDER BY time",
BackendConfig.INSTANCE.getInfluxDatabaseName());
@@ -105,9 +120,13 @@ public class DataLakeManagementV3 {
return dataResult;
}
- public GroupedDataResult getEvents(String index, long startDate, long endDate, String groupingTag) {
+ public GroupedDataResult getEvents(String index, long startDate, long endDate, String groupingTag, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
- Query query = new Query("SELECT * FROM " + index
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SELECT * FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+ " GROUP BY " + groupingTag
+ " ORDER BY time",
@@ -120,10 +139,10 @@ public class DataLakeManagementV3 {
return groupedDataResult;
}
- public DataResult getEventsAutoAggregation(String index, long startDate, long endDate)
+ public DataResult getEventsAutoAggregation(String index, long startDate, long endDate, @Nullable String rp)
throws ParseException {
InfluxDB influxDB = getInfluxDBClient();
- double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate);
+ double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate, rp);
influxDB.close();
if (numberOfRecords == 0) {
@@ -131,18 +150,19 @@ public class DataLakeManagementV3 {
return new DataResult();
} else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
influxDB.close();
- return getEvents(index, startDate, endDate);
+ return getEvents(index, startDate, endDate, rp);
} else {
- int aggregatinValue = getAggregationValue(index, influxDB);
+ int aggregatinValue = getAggregationValue(index, influxDB, rp);
influxDB.close();
- return getEvents(index, startDate, endDate, "ms", aggregatinValue);
+ return getEvents(index, startDate, endDate, "ms", aggregatinValue, rp);
}
}
- public GroupedDataResult getEventsAutoAggregation(String index, long startDate, long endDate, String groupingTag)
+ public GroupedDataResult getEventsAutoAggregation(String index, long startDate, long endDate, String groupingTag, @Nullable String rp)
throws ParseException {
InfluxDB influxDB = getInfluxDBClient();
- double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate);
+
+ double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate, rp);
influxDB.close();
if (numberOfRecords == 0) {
@@ -150,23 +170,27 @@ public class DataLakeManagementV3 {
return new GroupedDataResult(0, new HashMap<>());
} else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
influxDB.close();
- return getEvents(index, startDate, endDate, groupingTag);
+ return getEvents(index, startDate, endDate, groupingTag, rp);
} else {
- int aggregatinValue = getAggregationValue(index, influxDB);
+ int aggregatinValue = getAggregationValue(index, influxDB, rp);
influxDB.close();
- return getEvents(index, startDate, endDate, "ms", aggregatinValue, groupingTag);
+ return getEvents(index, startDate, endDate, "ms", aggregatinValue, groupingTag, rp);
}
}
public DataResult getEventsFromNow(String index, String timeunit, int value,
- String aggregationUnit, int aggregationValue)
+ String aggregationUnit, int aggregationValue, @Nullable String rp)
throws ParseException {
InfluxDB influxDB = getInfluxDBClient();
- Query query = new Query("SELECT mean(*) FROM " + index + " WHERE time > now() -" + value + timeunit
- + " GROUP BY time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
- BackendConfig.INSTANCE.getInfluxDatabaseName());
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SELECT mean(*) FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " WHERE time > now() -" + value + timeunit
+ + " GROUP BY time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
+ BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult result = influxDB.query(query);
DataResult dataResult = convertResult(result);
@@ -175,14 +199,14 @@ public class DataLakeManagementV3 {
}
- public DataResult getEventsFromNow(String index, String timeunit, int value) {
+ public DataResult getEventsFromNow(String index, String timeunit, int value, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
Query query = new Query("SELECT * FROM "
- + index
- + " WHERE time > now() -"
- + value
- + timeunit
- + " ORDER BY time",
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " WHERE time > now() -" + value + timeunit + " ORDER BY time",
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult result = influxDB.query(query);
@@ -192,32 +216,33 @@ public class DataLakeManagementV3 {
return dataResult;
}
- public DataResult getEventsFromNowAutoAggregation(String index, String timeunit, int value)
+ public DataResult getEventsFromNowAutoAggregation(String index, String timeunit, int value, @Nullable String rp)
throws ParseException {
InfluxDB influxDB = getInfluxDBClient();
- double numberOfRecords = getNumOfRecordsOfTableFromNow(index, influxDB, timeunit, value);
+ double numberOfRecords = getNumOfRecordsOfTableFromNow(index, influxDB, timeunit, value, rp);
if (numberOfRecords == 0) {
influxDB.close();
return new DataResult();
} else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
influxDB.close();
- return getEventsFromNow(index, timeunit, value);
+ return getEventsFromNow(index, timeunit, value, rp);
} else {
- int aggregationValue = getAggregationValue(index, influxDB);
+ int aggregationValue = getAggregationValue(index, influxDB, rp);
influxDB.close();
- return getEventsFromNow(index, timeunit, value, "ms", aggregationValue);
+ return getEventsFromNow(index, timeunit, value, "ms", aggregationValue, rp);
}
}
- public PageResult getEvents(String index, int itemsPerPage, int page) {
+ public PageResult getEvents(String index, int itemsPerPage, int page, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+
Query query = new Query("SELECT * FROM "
- + index
- + " ORDER BY time LIMIT "
- + itemsPerPage
- + " OFFSET "
- + page * itemsPerPage,
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " ORDER BY time LIMIT " + itemsPerPage + " OFFSET " + page * itemsPerPage,
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult result = influxDB.query(query);
@@ -301,9 +326,7 @@ public class DataLakeManagementV3 {
public boolean deleteRetentionPolicy(String rp) {
InfluxDB influxDB = getInfluxDBClient();
Query query = new Query("DROP RETENTION POLICY "
- + rp
- + " ON "
- + BackendConfig.INSTANCE.getInfluxDatabaseName(),
+ + rp + " ON " + BackendConfig.INSTANCE.getInfluxDatabaseName(),
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult influx_result = influxDB.query(query);
@@ -315,12 +338,23 @@ public class DataLakeManagementV3 {
return true;
}
- public double getNumOfRecordsOfTable(String index) {
+ public double getNumOfRecordsOfTable(String index, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
double numOfRecords = 0;
- QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index,
- BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
+ if (rp == null) {
+ rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+ }
+
+ Query query = new Query("SELECT count(*) FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index,
+ BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+ QueryResult.Result result = influxDB.query
+
+
+ (query).getResults().get(0);
+
if (result.getSeries() == null) {
return numOfRecords;
}
@@ -389,22 +423,22 @@ public class DataLakeManagementV3 {
// TODO: Raise exception if both not successful
}
- public PageResult getEvents(String index, int itemsPerPage) throws IOException {
+ public PageResult getEvents(String index, int itemsPerPage, @Nullable String rp) throws IOException {
int page = getMaxPage(index, itemsPerPage);
if (page > 0) {
page = page -1;
}
- return getEvents(index, itemsPerPage, page);
+ return getEvents(index, itemsPerPage, page, rp);
}
- public StreamingOutput getAllEvents(String index, String outputFormat) {
- return getAllEvents(index, outputFormat, null, null);
+ public StreamingOutput getAllEvents(String index, String outputFormat, String rp) {
+ return getAllEvents(index, outputFormat, null, null, rp);
}
public StreamingOutput getAllEvents(String index, String outputFormat, @Nullable Long startDate,
- @Nullable Long endDate) {
+ @Nullable Long endDate, @Nullable String rp) {
return new StreamingOutput() {
@Override
public void write(OutputStream outputStream) throws IOException, WebApplicationException {
@@ -421,7 +455,7 @@ public class DataLakeManagementV3 {
outputStream.write(toBytes("["));
do {
- Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate);
+ Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate, rp);
QueryResult result = influxDB.query(query, TimeUnit.MILLISECONDS);
dataResult = new DataResult();
if ((result.getResults().get(0).getSeries() != null)) {
@@ -466,7 +500,7 @@ public class DataLakeManagementV3 {
boolean isFirstDataObject = true;
do {
- Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate);
+ Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate, rp);
QueryResult result = influxDB.query(query, TimeUnit.MILLISECONDS);
dataResult = new DataResult();
if ((result.getResults().get(0).getSeries() != null)) {
@@ -539,27 +573,23 @@ public class DataLakeManagementV3 {
}
private Query getRawDataQueryWithPage(int page, int itemsPerRequest, String index,
- @Nullable Long startDate, @Nullable Long endDate) {
+ @Nullable Long startDate, @Nullable Long endDate, @Nullable String rp) {
Query query;
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
if (startDate != null && endDate != null) {
query = new Query("SELECT * FROM "
- + index
- + " WHERE time > "
- + startDate * 1000000
- + " AND time < "
- + endDate * 1000000
- + " ORDER BY time LIMIT "
- + itemsPerRequest
- + " OFFSET "
- + page * itemsPerRequest,
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+ + " ORDER BY time LIMIT " + itemsPerRequest
+ + " OFFSET " + page * itemsPerRequest,
BackendConfig.INSTANCE.getInfluxDatabaseName());
} else {
query = new Query("SELECT * FROM "
- + index
- + " ORDER BY time LIMIT "
- + itemsPerRequest
- + " OFFSET "
- + page * itemsPerRequest,
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " ORDER BY time LIMIT " + itemsPerRequest
+ + " OFFSET " + page * itemsPerRequest,
BackendConfig.INSTANCE.getInfluxDatabaseName());
}
return query;
@@ -616,22 +646,30 @@ public class DataLakeManagementV3 {
}
- private int getAggregationValue(String index, InfluxDB influxDB) throws ParseException {
- long timerange = getDateFromNewestRecordReOfTable(index, influxDB) - getDateFromOldestRecordReOfTable(index, influxDB);
+ private int getAggregationValue(String index, InfluxDB influxDB, @Nullable String rp) throws ParseException {
+ long timerange = getDateFromNewestRecordReOfTable(index, influxDB, rp) - getDateFromOldestRecordReOfTable(index, influxDB, rp);
double v = timerange / NUM_OF_AUTO_AGGREGATION_VALUES;
return Double.valueOf(v).intValue();
}
- private long getDateFromNewestRecordReOfTable(String index, InfluxDB influxDB) throws ParseException {
- Query query = new Query("SELECT * FROM " + index + " ORDER BY desc LIMIT 1 ",
+ private long getDateFromNewestRecordReOfTable(String index, InfluxDB influxDB, @Nullable String rp) throws ParseException {
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SELECT * FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " ORDER BY desc LIMIT 1 ",
BackendConfig.INSTANCE.getInfluxDatabaseName());
return getDateFromRecordOfTable(query, influxDB);
}
- private long getDateFromOldestRecordReOfTable(String index, InfluxDB influxDB) throws ParseException {
- Query query = new Query("SELECT * FROM " + index + " ORDER BY asc LIMIT 1 ",
+ private long getDateFromOldestRecordReOfTable(String index, InfluxDB influxDB, @Nullable String rp) throws ParseException {
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+ Query query = new Query("SELECT * FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+ + " ORDER BY asc LIMIT 1 ",
BackendConfig.INSTANCE.getInfluxDatabaseName());
return getDateFromRecordOfTable(query, influxDB);
@@ -646,12 +684,16 @@ public class DataLakeManagementV3 {
return date.getTime();
}
- private double getNumOfRecordsOfTable(String index, InfluxDB influxDB, long startDate, long endDate) {
+ private double getNumOfRecordsOfTable(String index, InfluxDB influxDB, long startDate, long endDate, @Nullable String rp) {
double numOfRecords = 0;
- QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index +
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index +
" WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000,
BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
+
if (result.getSeries() == null) {
return numOfRecords;
}
@@ -665,9 +707,13 @@ public class DataLakeManagementV3 {
return numOfRecords;
}
- private double getNumOfRecordsOfTableFromNow(String index, InfluxDB influxDB, String timeunit, int value) {
+ private double getNumOfRecordsOfTableFromNow(String index, InfluxDB influxDB, String timeunit, int value, @Nullable String rp) {
double numOfRecords = 0;
- QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index +
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index +
" WHERE time > now() -" + value + timeunit,
BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
if (result.getSeries() == null) {
@@ -749,9 +795,9 @@ public class DataLakeManagementV3 {
return route;
}
- public void updateLabels(String index, String labelColumn, long startdate, long enddate, String label) {
- DataResult queryResult = getEvents(index, startdate, enddate);
- Map<String, String> headerWithTypes = getHeadersWithTypes(index);
+ public void updateLabels(String index, String labelColumn, long startdate, long enddate, String label, @Nullable String rp) {
+ DataResult queryResult = getEvents(index, startdate, enddate, rp);
+ Map<String, String> headerWithTypes = getHeadersWithTypes(index, rp);
List<String> headers = queryResult.getHeaders();
InfluxDB influxDB = getInfluxDBClient();
@@ -779,9 +825,13 @@ public class DataLakeManagementV3 {
influxDB.close();
}
- private Map<String, String> getHeadersWithTypes(String index) {
+ private Map<String, String> getHeadersWithTypes(String index, @Nullable String rp) {
InfluxDB influxDB = getInfluxDBClient();
- Query query = new Query("SHOW FIELD KEYS FROM " + index,
+
+ if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+ Query query = new Query("SHOW FIELD KEYS FROM "
+ + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index,
BackendConfig.INSTANCE.getInfluxDatabaseName());
QueryResult result = influxDB.query(query);
influxDB.close();
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index d7157d7..9734cf5 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -55,13 +55,14 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
PageResult result;
String page = info.getQueryParameters().getFirst("page");
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
try {
if (page != null) {
- result = this.dataLakeManagement.getEvents(index, itemsPerPage, Integer.parseInt(page));
+ result = this.dataLakeManagement.getEvents(index, itemsPerPage, Integer.parseInt(page), pr);
} else {
- result = this.dataLakeManagement.getEvents(index, itemsPerPage);
+ result = this.dataLakeManagement.getEvents(index, itemsPerPage, pr);
}
return Response.ok(result).build();
} catch (IOException e) {
@@ -86,8 +87,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/data/{index}")
public Response getAllData(@PathParam("index") String index,
- @QueryParam("format") String format) {
- StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format);
+ @QueryParam("format") String format,
+ @Context UriInfo info) {
+
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
+ StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, pr);
return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
header("Content-Disposition", "attachment; filename=\"datalake." + format + "\"")
@@ -98,8 +102,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/data/{index}/download")
public Response downloadData(@PathParam("index") String index,
- @QueryParam("format") String format) {
- StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format);
+ @QueryParam("format") String format,
+ @Context UriInfo info) {
+
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
+ StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, pr);
return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
header("Content-Disposition", "attachment; filename=\"datalake." + format + "\"")
@@ -108,10 +115,8 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@GET
@Path("/data/{index}/delete")
- // @Produces(MediaType.APPLICATION_JSON)
public void deleteMeasurement(@PathParam("index") String index) {
dataLakeManagement.deleteMeasurement(index);
- // return ok("John");
}
@GET
@@ -125,14 +130,15 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
String aggregationUnit = info.getQueryParameters().getFirst("aggregationUnit");
String aggregationValue = info.getQueryParameters().getFirst("aggregationValue");
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
DataResult result;
try {
if (aggregationUnit != null && aggregationValue != null) {
result = dataLakeManagement.getEventsFromNow(index, unit, value, aggregationUnit,
- Integer.parseInt(aggregationValue));
+ Integer.parseInt(aggregationValue), pr);
} else {
- result = dataLakeManagement.getEventsFromNowAutoAggregation(index, unit, value);
+ result = dataLakeManagement.getEventsFromNowAutoAggregation(index, unit, value, pr);
}
return Response.ok(result).build();
} catch (IllegalArgumentException e) {
@@ -153,16 +159,17 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
String aggregationUnit = info.getQueryParameters().getFirst("aggregationUnit");
String aggregationValue = info.getQueryParameters().getFirst("aggregationValue");
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
DataResult result;
try {
if (aggregationUnit != null && aggregationValue != null) {
result = dataLakeManagement.getEvents(index, startdate, enddate, aggregationUnit,
- Integer.parseInt(aggregationValue));
+ Integer.parseInt(aggregationValue), pr);
} else {
- result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate);
+ result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate, pr);
}
return Response.ok(result).build();
} catch (IllegalArgumentException | ParseException e) {
@@ -181,14 +188,15 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
String aggregationUnit = info.getQueryParameters().getFirst("aggregationUnit");
String aggregationValue = info.getQueryParameters().getFirst("aggregationValue");
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
GroupedDataResult result;
try {
if (aggregationUnit != null && aggregationValue != null) {
result = dataLakeManagement.getEvents(index, startdate, enddate, aggregationUnit,
- Integer.parseInt(aggregationValue), groupingTag);
+ Integer.parseInt(aggregationValue), groupingTag, pr);
} else {
- result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate, groupingTag);
+ result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate, groupingTag, pr);
}
return Response.ok(result).build();
} catch (IllegalArgumentException | ParseException e) {
@@ -210,8 +218,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@Produces(MediaType.APPLICATION_OCTET_STREAM)
@Path("/data/{index}/{startdate}/{enddate}/download")
public Response downloadData(@PathParam("index") String index, @QueryParam("format") String format,
- @PathParam("startdate") long start, @PathParam("enddate") long end) {
- StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, start, end);
+ @PathParam("startdate") long start, @PathParam("enddate") long end,
+ @Context UriInfo info) {
+
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
+ StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, start, end, pr);
return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
header("Content-Disposition", "attachment; filename=\"datalake." + format + "\"")
@@ -221,8 +232,10 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@GET
@Produces(MediaType.TEXT_PLAIN)
@Path("/data/{index}/count")
- public Response getNumOfRecordsOfTable(@PathParam("index") String index) {
- Double numOfRecords = dataLakeManagement.getNumOfRecordsOfTable(index);
+ public Response getNumOfRecordsOfTable(@PathParam("index") String index,
+ @Context UriInfo info){
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
+ Double numOfRecords = dataLakeManagement.getNumOfRecordsOfTable(index, pr);
return Response.ok(numOfRecords, MediaType.TEXT_PLAIN).build();
}
@@ -256,7 +269,8 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@PathParam("column") String column) {
String label = info.getQueryParameters().getFirst("label");
- this.dataLakeManagement.updateLabels(index, column, startdate, enddate, label);
+ String pr = info.getQueryParameters().getFirst("retentionPolicy");
+ this.dataLakeManagement.updateLabels(index, column, startdate, enddate, label, pr);
return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
}
@@ -281,7 +295,6 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@Path("/policy/{name}/delete")
@Produces(MediaType.TEXT_PLAIN)
public Response deleteRetentionPolicy(@PathParam("name") String policyName) {
- dataLakeManagement.deleteRetentionPolicy(policyName);
if (dataLakeManagement.deleteRetentionPolicy(policyName)) {
return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
} else {