You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2021/06/11 18:43:41 UTC
[incubator-streampipes] 27/29: [STREAMPIPES-349] Add implementation
of endpoint for removing entire measurement series and related event
property
This is an automated email from the ASF dual-hosted git repository.
ebi pushed a commit to branch STREAMPIPES-349
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 307383deb17db3e6d4e90c440013a984aad52e43
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Fri Jun 11 20:13:54 2021 +0200
[STREAMPIPES-349] Add implementation of endpoint for removing entire measurement series and related event property
---
.../dataexplorer/DataLakeManagementV4.java | 39 ++++++++++++++++++++++
.../apache/streampipes/ps/DataLakeResourceV4.java | 15 +++++++--
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index b2b7192..1eed837 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.dataexplorer;
import com.google.gson.Gson;
+import com.google.gson.JsonObject;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.query.EditRetentionPolicyQuery;
@@ -30,7 +31,9 @@ import org.apache.streampipes.model.datalake.DataLakeConfiguration;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
import org.apache.streampipes.model.datalake.DataResult;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
import org.influxdb.dto.QueryResult;
+import org.lightcouch.CouchDbClient;
import java.io.IOException;
import java.io.OutputStream;
@@ -178,6 +181,21 @@ public class DataLakeManagementV4 {
return true;
}
+ public boolean removeMeasurement(String measurementID) {
+ List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+ for (DataLakeMeasure measure : allMeasurements) {
+ if (measure.getMeasureName().equals(measurementID)) {
+ QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
+
+ if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+ return false;
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
public DataResult deleteData(String measurementID, Long startDate, Long endDate) {
Map<String, QueryParamsV4> queryParts = getDeleteQueryParams(measurementID, startDate, endDate);
return new DataExplorerQueryV4(queryParts).executeQuery();
@@ -226,6 +244,27 @@ public class DataLakeManagementV4 {
return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
}
+ public boolean removeEventProperty(String measurementID) {
+ boolean isSuccess = false;
+ CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+ List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+ for (JsonObject document : docs) {
+ if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
+ couchDbClient.remove(document.get("_id").toString().replace("\"", ""), document.get("_rev").toString().replace("\"", ""));
+ isSuccess = true;
+ break;
+ }
+ }
+
+ try {
+ couchDbClient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return isSuccess;
+ }
+
private Map<String, QueryParamsV4> getQueryParams(String measurementID, Long startDate, Long endDate, Integer page, Integer limit, Integer offset, String groupBy, String order, String aggregationFunction, String timeInterval) {
Map<String, QueryParamsV4> queryParts = new HashMap<>();
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 53b3414..8692af0 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -94,9 +94,18 @@ public class DataLakeResourceV4 extends AbstractRestResource {
public Response dropMeasurementSeries(@Parameter(in = ParameterIn.PATH, description = "username", required = true) @PathParam("username") String username
, @Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true) @PathParam("measurementID") String measurementID) {
- /**
- * TODO: implementation of method stump
- */
+ boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+
+ if (isSuccessDataLake) {
+ boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+ if (isSuccessEventProperty) {
+ return ok();
+ } else {
+ return Response.status(Response.Status.NOT_FOUND).entity("Event property related to measurement series with given id not found.").build();
+ }
+ } else {
+ return Response.status(Response.Status.NOT_FOUND).entity("Measurement series with given id not found.").build();
+ }
}
@GET