You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by fj...@apache.org on 2020/09/25 06:37:06 UTC

[incubator-streampipes] branch datalake-rest-extension updated: Chane to InfluxDB Query with Retention Policy

This is an automated email from the ASF dual-hosted git repository.

fjohn pushed a commit to branch datalake-rest-extension
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/datalake-rest-extension by this push:
     new 64c1655  Chane to InfluxDB Query with Retention Policy
64c1655 is described below

commit 64c16554bdbe5e66926f6f0155233e7be52eb269
Author: Felix John <jo...@axantu.com>
AuthorDate: Fri Sep 25 08:36:55 2020 +0200

    Chane to InfluxDB Query with Retention Policy
---
 .../streampipes/config/backend/BackendConfig.java  |   5 +
 .../config/backend/BackendConfigKeys.java          |   1 +
 .../rest/impl/datalake/DataLakeManagementV3.java   | 214 +++++++++++++--------
 .../rest/impl/datalake/DataLakeResourceV3.java     |  53 +++--
 4 files changed, 171 insertions(+), 102 deletions(-)

diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
index b108ccc..f944650 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfig.java
@@ -63,6 +63,7 @@ public enum BackendConfig {
     config.register(BackendConfigKeys.INFLUX_HOST, "influxdb", "The host of the influx data base");
     config.register(BackendConfigKeys.INFLUX_PORT, 8086, "The hist of the influx data base");
     config.register(BackendConfigKeys.INFLUX_DATA_BASE, "sp", "The influx data base name");
+    config.register(BackendConfigKeys.INFLUX_DEFAULT_RETENTION_POLICY, "autogen", "The influx default retention policy name");
     config.registerObject(BackendConfigKeys.MESSAGING_SETTINGS, MessagingSettings.fromDefault(),
             "Default Messaging Settings");
 
@@ -224,6 +225,10 @@ public enum BackendConfig {
     return config.getString(BackendConfigKeys.INFLUX_DATA_BASE);
   }
 
+  public String getDefaultRetentionPolicyName() {
+    return config.getString(BackendConfigKeys.INFLUX_DEFAULT_RETENTION_POLICY);
+  }
+
   public String getEncryptionKey() {
     return config.getString(BackendConfigKeys.ENCRYPTION_KEY);
   }
diff --git a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
index aaa4755..c66e9df 100644
--- a/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
+++ b/streampipes-config/src/main/java/org/apache/streampipes/config/backend/BackendConfigKeys.java
@@ -41,6 +41,7 @@ public class BackendConfigKeys {
   public static final String INFLUX_PORT = "SP_INFLUX_PORT";
   public static final String INFLUX_HOST = "SP_INFLUX_HOST";
   public static final String INFLUX_DATA_BASE = "SP_INFLUX_DATA_BASE";
+  public static final String INFLUX_DEFAULT_RETENTION_POLICY = "SP_INFLUX_RETENTION_POLICY";
   public static final String MESSAGING_SETTINGS = "SP_MESSAGING_SETTINGS";
 
   public static final String ENCRYPTION_KEY = "SP_ENCRYPTION_KEY";
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index caaa401..4f6b860 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -47,6 +47,7 @@ import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 import javax.annotation.Nullable;
+import javax.validation.constraints.Null;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.StreamingOutput;
@@ -64,9 +65,14 @@ public class DataLakeManagementV3 {
     return indices;
   }
 
-  public DataResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue) {
+  public DataResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue, @Nullable String rp) {
     InfluxDB influxDB = getInfluxDBClient();
-    Query query = new Query("SELECT mean(*) FROM " + index + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    Query query = new Query("SELECT mean(*) FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+            + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
             + " GROUP BY time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
             BackendConfig.INSTANCE.getInfluxDatabaseName());
     QueryResult result = influxDB.query(query);
@@ -79,9 +85,14 @@ public class DataLakeManagementV3 {
 
 
   public GroupedDataResult getEvents(String index, long startDate, long endDate, String aggregationUnit, int aggregationValue,
-                                     String groupingTag) {
+                                     String groupingTag, @Nullable String rp) {
     InfluxDB influxDB = getInfluxDBClient();
-    Query query = new Query("SELECT mean(*), count(*) FROM " + index + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    Query query = new Query("SELECT mean(*), count(*) FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+            + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
             + " GROUP BY " + groupingTag + ",time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
             BackendConfig.INSTANCE.getInfluxDatabaseName());
     QueryResult result = influxDB.query(query);
@@ -92,9 +103,13 @@ public class DataLakeManagementV3 {
     return groupedDataResult;
   }
 
-  public DataResult getEvents(String index, long startDate, long endDate) {
+  public DataResult getEvents(String index, long startDate, long endDate, @Nullable String rp) {
     InfluxDB influxDB = getInfluxDBClient();
-    Query query = new Query("SELECT * FROM " + index
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    Query query = new Query("SELECT * FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
             + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
             + " ORDER BY time",
             BackendConfig.INSTANCE.getInfluxDatabaseName());
@@ -105,9 +120,13 @@ public class DataLakeManagementV3 {
     return dataResult;
   }
 
-  public GroupedDataResult getEvents(String index, long startDate, long endDate, String groupingTag) {
+  public GroupedDataResult getEvents(String index, long startDate, long endDate, String groupingTag, @Nullable String rp) {
     InfluxDB influxDB = getInfluxDBClient();
-    Query query = new Query("SELECT * FROM " + index
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    Query query = new Query("SELECT * FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
             + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
             + " GROUP BY " + groupingTag
             + " ORDER BY time",
@@ -120,10 +139,10 @@ public class DataLakeManagementV3 {
     return groupedDataResult;
   }
 
-  public DataResult getEventsAutoAggregation(String index, long startDate, long endDate)
+  public DataResult getEventsAutoAggregation(String index, long startDate, long endDate, @Nullable String rp)
           throws ParseException {
     InfluxDB influxDB = getInfluxDBClient();
-    double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate);
+    double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate, rp);
     influxDB.close();
 
     if (numberOfRecords == 0) {
@@ -131,18 +150,19 @@ public class DataLakeManagementV3 {
       return new DataResult();
     } else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
       influxDB.close();
-      return getEvents(index, startDate, endDate);
+      return getEvents(index, startDate, endDate, rp);
     } else {
-      int aggregatinValue = getAggregationValue(index, influxDB);
+      int aggregatinValue = getAggregationValue(index, influxDB, rp);
       influxDB.close();
-      return getEvents(index, startDate, endDate, "ms", aggregatinValue);
+      return getEvents(index, startDate, endDate, "ms", aggregatinValue, rp);
     }
   }
 
-  public GroupedDataResult getEventsAutoAggregation(String index, long startDate, long endDate, String groupingTag)
+  public GroupedDataResult getEventsAutoAggregation(String index, long startDate, long endDate, String groupingTag, @Nullable String rp)
           throws ParseException {
     InfluxDB influxDB = getInfluxDBClient();
-    double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate);
+
+    double numberOfRecords = getNumOfRecordsOfTable(index, influxDB, startDate, endDate, rp);
     influxDB.close();
 
     if (numberOfRecords == 0) {
@@ -150,23 +170,27 @@ public class DataLakeManagementV3 {
       return new GroupedDataResult(0, new HashMap<>());
     } else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
       influxDB.close();
-      return getEvents(index, startDate, endDate, groupingTag);
+      return getEvents(index, startDate, endDate, groupingTag, rp);
     } else {
-      int aggregatinValue = getAggregationValue(index, influxDB);
+      int aggregatinValue = getAggregationValue(index, influxDB, rp);
       influxDB.close();
-      return getEvents(index, startDate, endDate, "ms", aggregatinValue, groupingTag);
+      return getEvents(index, startDate, endDate, "ms", aggregatinValue, groupingTag, rp);
     }
   }
 
 
   public DataResult getEventsFromNow(String index, String timeunit, int value,
-                                     String aggregationUnit, int aggregationValue)
+                                     String aggregationUnit, int aggregationValue, @Nullable String rp)
           throws ParseException {
     InfluxDB influxDB = getInfluxDBClient();
 
-    Query query = new Query("SELECT mean(*) FROM " + index + " WHERE time > now() -" + value + timeunit
-            + " GROUP BY time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
-            BackendConfig.INSTANCE.getInfluxDatabaseName());
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    Query query = new Query("SELECT mean(*) FROM "
+                          + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+                          + " WHERE time > now() -" + value + timeunit
+                          + " GROUP BY time(" + aggregationValue + aggregationUnit + ") fill(none) ORDER BY time",
+                          BackendConfig.INSTANCE.getInfluxDatabaseName());
     QueryResult result = influxDB.query(query);
 
     DataResult dataResult = convertResult(result);
@@ -175,14 +199,14 @@ public class DataLakeManagementV3 {
   }
 
 
-  public DataResult getEventsFromNow(String index, String timeunit, int value) {
+  public DataResult getEventsFromNow(String index, String timeunit, int value, @Nullable String rp) {
     InfluxDB influxDB = getInfluxDBClient();
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
     Query query = new Query("SELECT * FROM "
-            + index
-            + " WHERE time > now() -"
-            + value
-            + timeunit
-            + " ORDER BY time",
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+            + " WHERE time > now() -" + value + timeunit + " ORDER BY time",
             BackendConfig.INSTANCE.getInfluxDatabaseName());
     QueryResult result = influxDB.query(query);
 
@@ -192,32 +216,33 @@ public class DataLakeManagementV3 {
     return dataResult;
   }
 
-  public DataResult getEventsFromNowAutoAggregation(String index, String timeunit, int value)
+  public DataResult getEventsFromNowAutoAggregation(String index, String timeunit, int value, @Nullable String rp)
           throws ParseException {
     InfluxDB influxDB = getInfluxDBClient();
-    double numberOfRecords = getNumOfRecordsOfTableFromNow(index, influxDB, timeunit, value);
+    double numberOfRecords = getNumOfRecordsOfTableFromNow(index, influxDB, timeunit, value, rp);
     if (numberOfRecords == 0) {
       influxDB.close();
       return new DataResult();
     } else if (numberOfRecords <= NUM_OF_AUTO_AGGREGATION_VALUES) {
       influxDB.close();
-      return getEventsFromNow(index, timeunit, value);
+      return getEventsFromNow(index, timeunit, value, rp);
     } else {
-      int aggregationValue = getAggregationValue(index, influxDB);
+      int aggregationValue = getAggregationValue(index, influxDB, rp);
       influxDB.close();
-      return getEventsFromNow(index, timeunit, value, "ms", aggregationValue);
+      return getEventsFromNow(index, timeunit, value, "ms", aggregationValue, rp);
     }
   }
 
 
-  public PageResult getEvents(String index, int itemsPerPage, int page) {
+  public PageResult getEvents(String index, int itemsPerPage, int page, @Nullable String rp) {
     InfluxDB influxDB = getInfluxDBClient();
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+
     Query query = new Query("SELECT * FROM "
-            + index
-            + " ORDER BY time LIMIT "
-            + itemsPerPage
-            + " OFFSET "
-            + page * itemsPerPage,
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+            + " ORDER BY time LIMIT " + itemsPerPage + " OFFSET " + page * itemsPerPage,
             BackendConfig.INSTANCE.getInfluxDatabaseName());
     QueryResult result = influxDB.query(query);
 
@@ -301,9 +326,7 @@ public class DataLakeManagementV3 {
   public boolean deleteRetentionPolicy(String rp) {
     InfluxDB influxDB = getInfluxDBClient();
     Query query = new Query("DROP RETENTION POLICY "
-            + rp
-            + " ON "
-            + BackendConfig.INSTANCE.getInfluxDatabaseName(),
+            + rp + " ON " + BackendConfig.INSTANCE.getInfluxDatabaseName(),
             BackendConfig.INSTANCE.getInfluxDatabaseName());
 
     QueryResult influx_result = influxDB.query(query);
@@ -315,12 +338,23 @@ public class DataLakeManagementV3 {
     return true;
   }
 
-  public double getNumOfRecordsOfTable(String index) {
+  public double getNumOfRecordsOfTable(String index, @Nullable  String rp) {
     InfluxDB influxDB = getInfluxDBClient();
     double numOfRecords = 0;
 
-    QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index,
-            BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
+    if (rp == null) {
+      rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+    }
+
+    Query query = new Query("SELECT count(*) FROM "
+                      + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index,
+                      BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+    QueryResult.Result result = influxDB.query
+
+
+            (query).getResults().get(0);
+
     if (result.getSeries() == null) {
       return numOfRecords;
     }
@@ -389,22 +423,22 @@ public class DataLakeManagementV3 {
     // TODO: Raise exception if both not successful
   }
 
-  public PageResult getEvents(String index, int itemsPerPage) throws IOException {
+  public PageResult getEvents(String index, int itemsPerPage, @Nullable String rp) throws IOException {
     int page = getMaxPage(index, itemsPerPage);
 
     if (page > 0) {
       page = page -1;
     }
 
-    return getEvents(index, itemsPerPage, page);
+    return getEvents(index, itemsPerPage, page, rp);
   }
 
-  public StreamingOutput getAllEvents(String index, String outputFormat) {
-    return getAllEvents(index, outputFormat, null, null);
+  public StreamingOutput getAllEvents(String index, String outputFormat, String rp) {
+    return getAllEvents(index, outputFormat, null, null, rp);
   }
 
   public StreamingOutput getAllEvents(String index, String outputFormat, @Nullable Long startDate,
-                                      @Nullable Long endDate) {
+                                      @Nullable Long endDate, @Nullable String rp) {
     return new StreamingOutput() {
       @Override
       public void write(OutputStream outputStream) throws IOException, WebApplicationException {
@@ -421,7 +455,7 @@ public class DataLakeManagementV3 {
 
           outputStream.write(toBytes("["));
           do {
-            Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate);
+            Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate, rp);
             QueryResult result = influxDB.query(query, TimeUnit.MILLISECONDS);
             dataResult = new DataResult();
             if ((result.getResults().get(0).getSeries() != null)) {
@@ -466,7 +500,7 @@ public class DataLakeManagementV3 {
           boolean isFirstDataObject = true;
 
           do {
-            Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate);
+            Query query = getRawDataQueryWithPage(i, itemsPerRequest, index, startDate, endDate, rp);
             QueryResult result = influxDB.query(query, TimeUnit.MILLISECONDS);
             dataResult = new DataResult();
             if ((result.getResults().get(0).getSeries() != null)) {
@@ -539,27 +573,23 @@ public class DataLakeManagementV3 {
   }
 
   private Query getRawDataQueryWithPage(int page, int itemsPerRequest, String index,
-                                        @Nullable Long startDate, @Nullable Long endDate) {
+                                        @Nullable Long startDate, @Nullable Long endDate, @Nullable String rp) {
     Query query;
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
     if (startDate != null && endDate != null) {
       query = new Query("SELECT * FROM "
-              + index
-              + " WHERE time > "
-              + startDate * 1000000
-              + " AND time < "
-              + endDate * 1000000
-              + " ORDER BY time LIMIT "
-              + itemsPerRequest
-              + " OFFSET "
-              + page * itemsPerRequest,
+              + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+              + " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000
+              + " ORDER BY time LIMIT " + itemsPerRequest
+              + " OFFSET " + page * itemsPerRequest,
               BackendConfig.INSTANCE.getInfluxDatabaseName());
     } else {
       query = new Query("SELECT * FROM "
-              + index
-              + " ORDER BY time LIMIT "
-              + itemsPerRequest
-              + " OFFSET "
-              + page * itemsPerRequest,
+              + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+              + " ORDER BY time LIMIT " + itemsPerRequest
+              + " OFFSET " + page * itemsPerRequest,
               BackendConfig.INSTANCE.getInfluxDatabaseName());
     }
     return query;
@@ -616,22 +646,30 @@ public class DataLakeManagementV3 {
 
   }
 
-  private int getAggregationValue(String index, InfluxDB influxDB) throws ParseException {
-    long timerange = getDateFromNewestRecordReOfTable(index, influxDB) - getDateFromOldestRecordReOfTable(index, influxDB);
+  private int getAggregationValue(String index, InfluxDB influxDB, @Nullable String rp) throws ParseException {
+    long timerange = getDateFromNewestRecordReOfTable(index, influxDB, rp) - getDateFromOldestRecordReOfTable(index, influxDB, rp);
     double v = timerange / NUM_OF_AUTO_AGGREGATION_VALUES;
     return Double.valueOf(v).intValue();
   }
 
-  private long getDateFromNewestRecordReOfTable(String index, InfluxDB influxDB) throws ParseException {
-    Query query = new Query("SELECT * FROM " + index + " ORDER BY desc LIMIT 1 ",
+  private long getDateFromNewestRecordReOfTable(String index, InfluxDB influxDB, @Nullable String rp) throws ParseException {
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    Query query = new Query("SELECT * FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+            + " ORDER BY desc LIMIT 1 ",
             BackendConfig.INSTANCE.getInfluxDatabaseName());
 
    return getDateFromRecordOfTable(query, influxDB);
 
   }
 
-  private long getDateFromOldestRecordReOfTable(String index, InfluxDB influxDB) throws ParseException {
-    Query query = new Query("SELECT * FROM " + index + " ORDER BY asc LIMIT 1 ",
+  private long getDateFromOldestRecordReOfTable(String index, InfluxDB influxDB, @Nullable String rp) throws ParseException {
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+    Query query = new Query("SELECT * FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index
+            + " ORDER BY asc LIMIT 1 ",
             BackendConfig.INSTANCE.getInfluxDatabaseName());
 
     return getDateFromRecordOfTable(query, influxDB);
@@ -646,12 +684,16 @@ public class DataLakeManagementV3 {
     return date.getTime();
   }
 
-  private double getNumOfRecordsOfTable(String index, InfluxDB influxDB, long startDate, long endDate) {
+  private double getNumOfRecordsOfTable(String index, InfluxDB influxDB, long startDate, long endDate, @Nullable String rp) {
     double numOfRecords = 0;
 
-    QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index +
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index +
             " WHERE time > " + startDate * 1000000 + " AND time < " + endDate * 1000000,
             BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
+
     if (result.getSeries() == null) {
       return numOfRecords;
     }
@@ -665,9 +707,13 @@ public class DataLakeManagementV3 {
     return numOfRecords;
   }
 
-  private double getNumOfRecordsOfTableFromNow(String index, InfluxDB influxDB, String timeunit, int value) {
+  private double getNumOfRecordsOfTableFromNow(String index, InfluxDB influxDB, String timeunit, int value, @Nullable String rp) {
     double numOfRecords = 0;
-    QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM " + index +
+
+    if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+    QueryResult.Result result = influxDB.query(new Query("SELECT count(*) FROM "
+            + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index +
             " WHERE time > now() -" + value + timeunit,
             BackendConfig.INSTANCE.getInfluxDatabaseName())).getResults().get(0);
     if (result.getSeries() == null) {
@@ -749,9 +795,9 @@ public class DataLakeManagementV3 {
     return route;
   }
 
-  public void updateLabels(String index, String labelColumn, long startdate, long enddate, String label) {
-    DataResult queryResult = getEvents(index, startdate, enddate);
-    Map<String, String> headerWithTypes = getHeadersWithTypes(index);
+  public void updateLabels(String index, String labelColumn, long startdate, long enddate, String label, @Nullable  String rp) {
+    DataResult queryResult = getEvents(index, startdate, enddate, rp);
+    Map<String, String> headerWithTypes = getHeadersWithTypes(index, rp);
     List<String> headers = queryResult.getHeaders();
 
     InfluxDB influxDB = getInfluxDBClient();
@@ -779,9 +825,13 @@ public class DataLakeManagementV3 {
     influxDB.close();
   }
 
-  private Map<String, String> getHeadersWithTypes(String index) {
+  private Map<String, String> getHeadersWithTypes(String index, @Nullable String rp) {
       InfluxDB influxDB = getInfluxDBClient();
-      Query query = new Query("SHOW FIELD KEYS FROM " + index,
+
+      if (rp == null) rp = BackendConfig.INSTANCE.getDefaultRetentionPolicyName();
+
+      Query query = new Query("SHOW FIELD KEYS FROM "
+              + BackendConfig.INSTANCE.getInfluxDatabaseName() + "." + rp + "." + index,
               BackendConfig.INSTANCE.getInfluxDatabaseName());
       QueryResult result = influxDB.query(query);
       influxDB.close();
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index d7157d7..9734cf5 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -55,13 +55,14 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
     PageResult result;
     String page = info.getQueryParameters().getFirst("page");
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
 
     try {
 
       if (page != null) {
-        result = this.dataLakeManagement.getEvents(index, itemsPerPage, Integer.parseInt(page));
+        result = this.dataLakeManagement.getEvents(index, itemsPerPage, Integer.parseInt(page), pr);
       } else {
-        result = this.dataLakeManagement.getEvents(index, itemsPerPage);
+        result = this.dataLakeManagement.getEvents(index, itemsPerPage, pr);
       }
       return Response.ok(result).build();
     } catch (IOException e) {
@@ -86,8 +87,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   @Path("/data/{index}")
   public Response getAllData(@PathParam("index") String index,
-                             @QueryParam("format") String format) {
-    StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format);
+                             @QueryParam("format") String format,
+                             @Context UriInfo info) {
+
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
+    StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, pr);
 
     return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
             header("Content-Disposition", "attachment; filename=\"datalake." + format + "\"")
@@ -98,8 +102,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   @Path("/data/{index}/download")
   public Response downloadData(@PathParam("index") String index,
-                               @QueryParam("format") String format) {
-    StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format);
+                               @QueryParam("format") String format,
+                               @Context UriInfo info) {
+
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
+    StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, pr);
 
     return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
             header("Content-Disposition", "attachment; filename=\"datalake." + format + "\"")
@@ -108,10 +115,8 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
   @GET
   @Path("/data/{index}/delete")
-  // @Produces(MediaType.APPLICATION_JSON)
   public void deleteMeasurement(@PathParam("index") String index) {
     dataLakeManagement.deleteMeasurement(index);
-    // return ok("John");
   }
 
   @GET
@@ -125,14 +130,15 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
     String aggregationUnit = info.getQueryParameters().getFirst("aggregationUnit");
     String aggregationValue = info.getQueryParameters().getFirst("aggregationValue");
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
 
     DataResult result;
     try {
       if (aggregationUnit != null && aggregationValue != null) {
         result = dataLakeManagement.getEventsFromNow(index, unit, value, aggregationUnit,
-                Integer.parseInt(aggregationValue));
+                Integer.parseInt(aggregationValue), pr);
       } else {
-        result = dataLakeManagement.getEventsFromNowAutoAggregation(index, unit, value);
+        result = dataLakeManagement.getEventsFromNowAutoAggregation(index, unit, value, pr);
       }
       return Response.ok(result).build();
     } catch (IllegalArgumentException e) {
@@ -153,16 +159,17 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
     String aggregationUnit = info.getQueryParameters().getFirst("aggregationUnit");
     String aggregationValue = info.getQueryParameters().getFirst("aggregationValue");
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
 
     DataResult result;
 
     try {
       if (aggregationUnit != null && aggregationValue != null) {
           result = dataLakeManagement.getEvents(index, startdate, enddate, aggregationUnit,
-                  Integer.parseInt(aggregationValue));
+                  Integer.parseInt(aggregationValue), pr);
 
       } else {
-          result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate);
+          result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate, pr);
       }
       return Response.ok(result).build();
     } catch (IllegalArgumentException | ParseException e) {
@@ -181,14 +188,15 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
 
     String aggregationUnit = info.getQueryParameters().getFirst("aggregationUnit");
     String aggregationValue = info.getQueryParameters().getFirst("aggregationValue");
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
 
     GroupedDataResult result;
     try {
       if (aggregationUnit != null && aggregationValue != null) {
           result = dataLakeManagement.getEvents(index, startdate, enddate, aggregationUnit,
-                  Integer.parseInt(aggregationValue), groupingTag);
+                  Integer.parseInt(aggregationValue), groupingTag, pr);
       } else {
-          result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate, groupingTag);
+          result = dataLakeManagement.getEventsAutoAggregation(index, startdate, enddate, groupingTag, pr);
       }
       return Response.ok(result).build();
     } catch (IllegalArgumentException | ParseException e) {
@@ -210,8 +218,11 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @Produces(MediaType.APPLICATION_OCTET_STREAM)
   @Path("/data/{index}/{startdate}/{enddate}/download")
   public Response downloadData(@PathParam("index") String index, @QueryParam("format") String format,
-                               @PathParam("startdate") long start, @PathParam("enddate") long end) {
-    StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, start, end);
+                               @PathParam("startdate") long start, @PathParam("enddate") long end,
+                               @Context UriInfo info) {
+
+    String pr = info.getQueryParameters().getFirst("retentionPolicy");
+    StreamingOutput streamingOutput = dataLakeManagement.getAllEvents(index, format, start, end, pr);
 
     return Response.ok(streamingOutput, MediaType.APPLICATION_OCTET_STREAM).
             header("Content-Disposition", "attachment; filename=\"datalake." + format + "\"")
@@ -221,8 +232,10 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @GET
   @Produces(MediaType.TEXT_PLAIN)
   @Path("/data/{index}/count")
-  public Response getNumOfRecordsOfTable(@PathParam("index") String index) {
-      Double numOfRecords = dataLakeManagement.getNumOfRecordsOfTable(index);
+  public Response getNumOfRecordsOfTable(@PathParam("index") String index,
+                                         @Context UriInfo info){
+      String pr = info.getQueryParameters().getFirst("retentionPolicy");
+      Double numOfRecords = dataLakeManagement.getNumOfRecordsOfTable(index, pr);
       return Response.ok(numOfRecords, MediaType.TEXT_PLAIN).build();
   }
 
@@ -256,7 +269,8 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
                             @PathParam("column") String column) {
 
       String label = info.getQueryParameters().getFirst("label");
-      this.dataLakeManagement.updateLabels(index, column, startdate, enddate, label);
+      String pr = info.getQueryParameters().getFirst("retentionPolicy");
+      this.dataLakeManagement.updateLabels(index, column, startdate, enddate, label, pr);
 
       return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
   }
@@ -281,7 +295,6 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
   @Path("/policy/{name}/delete")
   @Produces(MediaType.TEXT_PLAIN)
   public Response deleteRetentionPolicy(@PathParam("name") String policyName) {
-    dataLakeManagement.deleteRetentionPolicy(policyName);
     if (dataLakeManagement.deleteRetentionPolicy(policyName)) {
       return Response.ok("Successfully deleted the retention policy.", MediaType.TEXT_PLAIN).build();
     } else {