You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2020/05/13 10:37:27 UTC
[incubator-streampipes] 02/09: Add update method of labels in
database to new endpoint
This is an automated email from the ASF dual-hosted git repository.
ebi pushed a commit to branch timeseries-labeling
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 00f4c86dedc8c10a783f1cbe58d780dfcec91f71
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 13 09:36:44 2020 +0200
Add update method of labels in database to new endpoint
---
.../rest/impl/datalake/DataLakeManagementV3.java | 45 +++++++++++++++++++++-
.../rest/impl/datalake/DataLakeResourceV3.java | 4 +-
2 files changed, 47 insertions(+), 2 deletions(-)
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index edcc375..c6ef4ac 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.apache.streampipes.config.backend.BackendConfig;
@@ -93,7 +94,6 @@ public class DataLakeManagementV3 {
DataResult dataResult = convertResult(result);
influxDB.close();
-
return dataResult;
}
@@ -521,4 +521,47 @@ public class DataLakeManagementV3 {
}
}
+ public void updateLabels(String index, long startdate, long enddate, String label) {
+ DataResult queryResult = getEvents(index, startdate, enddate);
+ Map<String, String> headerWithTypes = getHeadersWithTypes(index);
+ List<String> headers = queryResult.getHeaders();
+
+ InfluxDB influxDB = getInfluxDBClient();
+ influxDB.setDatabase(BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+ for (List<Object> row : queryResult.getRows()) {
+ long timestampValue = Math.round((double) row.get(headers.indexOf("timestamp")));
+
+ Point.Builder p = Point.measurement(index).time(timestampValue, TimeUnit.MILLISECONDS);
+
+ for (int i = 1; i < row.size(); i++) {
+ String selected_header = headers.get(i);
+ if (!selected_header.equals("sp_internal_label")) {
+ if (headerWithTypes.get(selected_header).equals("integer")) {
+ p.addField(selected_header, Math.round((double) row.get(i)));
+ } else if (headerWithTypes.get(selected_header).equals("string")) {
+ p.addField(selected_header, row.get(i).toString());
+ }
+ } else {
+ p.addField(selected_header, label);
+ }
+ }
+ influxDB.write(p.build());
+ }
+ influxDB.close();
+ }
+
+ private Map<String, String> getHeadersWithTypes(String index) {
+ InfluxDB influxDB = getInfluxDBClient();
+ Query query = new Query("SHOW FIELD KEYS FROM " + index,
+ BackendConfig.INSTANCE.getInfluxDatabaseName());
+ QueryResult result = influxDB.query(query);
+ influxDB.close();
+
+ Map<String, String> headerTypes = new HashMap<String, String>();
+ for (List<Object> element : result.getResults().get(0).getSeries().get(0).getValues()) {
+ headerTypes.put(element.get(0).toString(), element.get(1).toString());
+ }
+ return headerTypes;
+ }
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index f845cbb..d4d61ac 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -217,7 +217,9 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
@PathParam("startdate") long startdate,
@PathParam("enddate") long enddate,
@PathParam("label") String label) {
-
+
+ this.dataLakeManagement.updateLabels(index, startdate, enddate, label);
+
return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
}