You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:20:15 UTC
[incubator-streampipes] 07/29: [STREAMPIPES-349] Add query defining
classes for changes at the retention policies
This is an automated email from the ASF dual-hosted git repository.
ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 7a4f1c376de352f1ab3d9eb546cf8e706f2a767e
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 5 10:24:49 2021 +0200
[STREAMPIPES-349] Add query defining classes for changes at the retention policies
---
.../param/RetentionPolicyQueryParams.java | 20 ++++++++
.../query/EditRetentionPolicyQuery.java | 56 ++++++++++++++++++++++
.../query/ShowRetentionPolicyQuery.java | 38 +++++++++++++++
3 files changed, 114 insertions(+)
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
new file mode 100644
index 0000000..e224570
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
@@ -0,0 +1,20 @@
+package org.apache.streampipes.dataexplorer.param;
+
+public class RetentionPolicyQueryParams extends QueryParams {
+ private final String durationLiteral;
+
+ public static RetentionPolicyQueryParams from(String index, String durationLiteral) {
+ return new RetentionPolicyQueryParams(index, durationLiteral);
+ }
+
+ protected RetentionPolicyQueryParams(String index, String durationLiteral) {
+ super(index);
+ this.durationLiteral = durationLiteral;
+ }
+
+ public String getDurationLiteral() {
+ return durationLiteral;
+ }
+
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
new file mode 100644
index 0000000..ddaed73
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
@@ -0,0 +1,56 @@
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
+import org.influxdb.dto.QueryResult;
+
+public class EditRetentionPolicyQuery extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, String> {
+
+ private static final String CREATE_OPERATOR = "CREATE";
+ private static final String ALTER_OPERATOR = "ALTER";
+ private static final String DROP_OPERATOR = "DROP";
+ private static final String RESET_OPERATOR = "DEFAULT";
+
+ private String operationToPerform;
+
+ public EditRetentionPolicyQuery(RetentionPolicyQueryParams queryParams, String operation) {
+ super(queryParams);
+ this.operationToPerform = operation;
+ }
+
+
+ @Override
+ protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
+ if (this.operationToPerform.equals(CREATE_OPERATOR)) {
+ queryBuilder.add(createRetentionPolicyStatement(params.getIndex()));
+ } else if (this.operationToPerform.equals(ALTER_OPERATOR)) {
+ queryBuilder.add(alterRetentionPolicyStatement(params.getIndex()));
+ } else if (this.operationToPerform.equals(DROP_OPERATOR)) {
+ queryBuilder.add(dropRetentionPolicyStatement(params.getIndex()));
+ } else if (this.operationToPerform.equals(RESET_OPERATOR)) {
+ queryBuilder.add(resetRetentionPolicyStatement());
+ }
+
+ }
+
+ @Override
+ protected String postQuery(QueryResult result) throws RuntimeException {
+ return result.toString();
+ }
+
+ private String createRetentionPolicyStatement(String index) {
+ return "CREATE RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral() + " REPLICATION 1 DEFAULT";
+ }
+
+ private String alterRetentionPolicyStatement(String index) {
+ return "ALTER RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral() + " REPLICATION 1 DEFAULT";
+ }
+
+ private String dropRetentionPolicyStatement(String index) {
+ return "DROP RETENTION POLICY " + index + " ON " + "sp";
+ }
+
+ private String resetRetentionPolicyStatement() {
+ return "ALTER RETENTION POLICY " + "autogen" + " ON " + "sp DURATION " + "0s" + " REPLICATION 1 DEFAULT";
+ }
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
new file mode 100644
index 0000000..5c4c3d8
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
@@ -0,0 +1,38 @@
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
+import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ShowRetentionPolicyQuery extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, List<DataLakeRetentionPolicy>> {
+
+ public ShowRetentionPolicyQuery(RetentionPolicyQueryParams queryParams) {
+ super(queryParams);
+ }
+
+
+ @Override
+ protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
+ queryBuilder.add(showRetentionPolicyStatement());
+ }
+
+ @Override
+ protected List<DataLakeRetentionPolicy> postQuery(QueryResult result) throws RuntimeException {
+ List<DataLakeRetentionPolicy> policies = new ArrayList<>();
+ for (List<Object> a : result.getResults().get(0).getSeries().get(0).getValues()) {
+ boolean isDefault = false;
+ if (a.get(4).toString().equals("true")) {
+ isDefault = true;
+ }
+ policies.add(new DataLakeRetentionPolicy(a.get(0).toString(), a.get(1).toString(), isDefault));
+ }
+ return policies;
+ }
+
+ private String showRetentionPolicyStatement() {
+ return "SHOW RETENTION POLICIES ON " + "sp";
+ }
+}