You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by eb...@apache.org on 2020/05/13 10:37:27 UTC

[incubator-streampipes] 02/09: Add update method of labels in database to new endpoint

This is an automated email from the ASF dual-hosted git repository.

ebi pushed a commit to branch timeseries-labeling
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 00f4c86dedc8c10a783f1cbe58d780dfcec91f71
Author: Daniel Ebi <eb...@fzi.de>
AuthorDate: Wed May 13 09:36:44 2020 +0200

    Add update method of labels in database to new endpoint
---
 .../rest/impl/datalake/DataLakeManagementV3.java   | 45 +++++++++++++++++++++-
 .../rest/impl/datalake/DataLakeResourceV3.java     |  4 +-
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
index edcc375..c6ef4ac 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeManagementV3.java
@@ -22,6 +22,7 @@ import com.google.gson.Gson;
 import okhttp3.OkHttpClient;
 import org.influxdb.InfluxDB;
 import org.influxdb.InfluxDBFactory;
+import org.influxdb.dto.Point;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 import org.apache.streampipes.config.backend.BackendConfig;
@@ -93,7 +94,6 @@ public class DataLakeManagementV3 {
 
     DataResult dataResult = convertResult(result);
     influxDB.close();
-
     return dataResult;
   }
 
@@ -521,4 +521,47 @@ public class DataLakeManagementV3 {
     }
   }
 
+  public void updateLabels(String index, long startdate, long enddate, String label) {
+    DataResult queryResult = getEvents(index, startdate, enddate);
+    Map<String, String> headerWithTypes = getHeadersWithTypes(index);
+    List<String> headers = queryResult.getHeaders();
+
+    InfluxDB influxDB = getInfluxDBClient();
+    influxDB.setDatabase(BackendConfig.INSTANCE.getInfluxDatabaseName());
+
+    for (List<Object> row : queryResult.getRows()) {
+      long timestampValue = Math.round((double) row.get(headers.indexOf("timestamp")));
+
+      Point.Builder p = Point.measurement(index).time(timestampValue, TimeUnit.MILLISECONDS);
+
+      for (int i = 1; i < row.size(); i++) {
+        String selected_header = headers.get(i);
+        if (!selected_header.equals("sp_internal_label")) {
+          if (headerWithTypes.get(selected_header).equals("integer")) {
+            p.addField(selected_header, Math.round((double) row.get(i)));
+          } else if (headerWithTypes.get(selected_header).equals("string")) {
+            p.addField(selected_header, row.get(i).toString());
+          }
+        } else {
+          p.addField(selected_header, label);
+        }
+      }
+      influxDB.write(p.build());
+    }
+    influxDB.close();
+  }
+
+  private Map<String, String> getHeadersWithTypes(String index) {
+      InfluxDB influxDB = getInfluxDBClient();
+      Query query = new Query("SHOW FIELD KEYS FROM " + index,
+              BackendConfig.INSTANCE.getInfluxDatabaseName());
+      QueryResult result = influxDB.query(query);
+      influxDB.close();
+
+      Map<String, String> headerTypes = new HashMap<String, String>();
+      for (List<Object> element : result.getResults().get(0).getSeries().get(0).getValues()) {
+        headerTypes.put(element.get(0).toString(), element.get(1).toString());
+      }
+      return headerTypes;
+    }
 }
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
index f845cbb..d4d61ac 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResourceV3.java
@@ -217,7 +217,9 @@ public class DataLakeResourceV3 extends AbstractRestInterface {
                               @PathParam("startdate") long startdate,
                               @PathParam("enddate") long enddate,
                               @PathParam("label") String label) {
-    
+
+    this.dataLakeManagement.updateLabels(index, startdate, enddate, label);
+
     return Response.ok("Successfully updated database.", MediaType.TEXT_PLAIN).build();
   }