You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:43:41 UTC

[incubator-streampipes] 27/29: [STREAMPIPES-349] Add implementation of endpoint for removing entire measurement series and related event property

This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 307383deb17db3e6d4e90c440013a984aad52e43
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:13:54 2021 +0200

    [STREAMPIPES-349] Add implementation of endpoint for removing entire measurement series and related event property
---
 .../dataexplorer/DataLakeManagementV4.java         | 39 ++++++++++++++++++++++
 .../apache/streampipes/ps/DataLakeResourceV4.java  | 15 +++++++--
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index b2b7192..1eed837 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -19,6 +19,7 @@
 package org.apache.streampipes.dataexplorer;
 
 import com.google.gson.Gson;
+import com.google.gson.JsonObject;
 import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
 import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
@@ -30,7 +31,9 @@ import org.apache.streampipes.model.datalake.DataLakeConfiguration;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
 import org.apache.streampipes.model.datalake.DataResult;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
 import org.influxdb.dto.QueryResult;
+import org.lightcouch.CouchDbClient;
 
 import java.io.IOException;
 import java.io.OutputStream;
@@ -178,6 +181,21 @@ public class DataLakeManagementV4 {
         return true;
     }
 
+    public boolean removeMeasurement(String measurementID) {
+        List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+        for (DataLakeMeasure measure : allMeasurements) {
+            if (measure.getMeasureName().equals(measurementID)) {
+                QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
+
+                if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+                    return false;
+                }
+                return true;
+            }
+        }
+        return false;
+    }
+
     public DataResult deleteData(String measurementID, Long startDate, Long endDate) {
         Map<String, QueryParamsV4> queryParts = getDeleteQueryParams(measurementID, startDate, endDate);
         return new DataExplorerQueryV4(queryParts).executeQuery();
@@ -226,6 +244,27 @@ public class DataLakeManagementV4 {
         return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
     }
 
+    public boolean removeEventProperty(String measurementID) {
+        boolean isSuccess = false;
+        CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+        List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+        for (JsonObject document : docs) {
+            if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
+                couchDbClient.remove(document.get("_id").toString().replace("\"", ""), document.get("_rev").toString().replace("\"", ""));
+                isSuccess = true;
+                break;
+            }
+        }
+
+        try {
+            couchDbClient.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        return isSuccess;
+    }
+
     private Map<String, QueryParamsV4> getQueryParams(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
         Map<String, QueryParamsV4> queryParts = new HashMap<>();
 
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 53b3414..8692af0 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -94,9 +94,18 @@ public class DataLakeResourceV4 extends AbstractRestResource {
     public Response dropMeasurementSeries(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
             , @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID) {
 
-        /**
-         * TODO: implementation of method stump
-         */
+        boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+
+        if (isSuccessDataLake) {
+            boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+            if (isSuccessEventProperty) {
+                return ok();
+            } else {
+                return Response.status(Response.Status.NOT_FOUND).entity("Event property related to measurement series with given id not found.").build();
+            }
+        } else {
+            return Response.status(Response.Status.NOT_FOUND).entity("Measurement series with given id not found.").build();
+        }
     }
 
     @GET