You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2024/04/25 07:49:17 UTC

(streampipes) branch dev updated: refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor` (#2781)

This is an automated email from the ASF dual-hosted git repository.

bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new f6dfc8735b refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor` (#2781)
f6dfc8735b is described below

commit f6dfc8735bc615497056217a903567e297864147
Author: Tim <50...@users.noreply.github.com>
AuthorDate: Thu Apr 25 09:49:12 2024 +0200

    refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor` (#2781)
    
    * refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor`
    
    * style: fix indentation
    
    * fix: improve result check
---
 .../dataexplorer/DataExplorerQueryManagement.java  | 59 ++++------------------
 .../influx/DataExplorerInfluxQueryExecutor.java    | 48 +++++++++++++++++-
 2 files changed, 56 insertions(+), 51 deletions(-)

diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
index 07c5058b6c..426262f1cb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
@@ -18,33 +18,22 @@
 
 package org.apache.streampipes.dataexplorer;
 
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
 import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
-import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
 import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
 import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
 import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
 import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
-import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.query.QueryResultProvider;
 import org.apache.streampipes.dataexplorer.query.StreamedQueryResultProvider;
 import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
 import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.SpQueryResult;
 
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {
 
@@ -72,10 +61,11 @@ public class DataExplorerQueryManagement implements IDataExplorerQueryManagement
   @Override
   public boolean deleteAllData() {
     List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+    var queryExecutor = new DataExplorerInfluxQueryExecutor();
 
     for (DataLakeMeasure measure : allMeasurements) {
-      QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
-      if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+      boolean success = queryExecutor.deleteData(measure);
+      if (!success) {
         return false;
       }
     }
@@ -85,14 +75,13 @@ public class DataExplorerQueryManagement implements IDataExplorerQueryManagement
   @Override
   public boolean deleteData(String measurementID) {
     List<DataLakeMeasure> allMeasurements = getAllMeasurements();
-    for (DataLakeMeasure measure : allMeasurements) {
-      if (measure.getMeasureName().equals(measurementID)) {
-        QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
 
-        return !queryResult.hasError();
-      }
-    }
-    return false;
+    var measureToDeleteOpt = allMeasurements.stream()
+                                            .filter(measure -> measure.getMeasureName().equals(measurementID))
+                                            .findFirst();
+
+    return measureToDeleteOpt.filter(measure -> new DataExplorerInfluxQueryExecutor().deleteData(measure))
+                             .isPresent();
   }
 
   @Override
@@ -105,38 +94,10 @@ public class DataExplorerQueryManagement implements IDataExplorerQueryManagement
   @Override
   public Map<String, Object> getTagValues(String measurementId,
                                           String fields) {
-    InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
-    String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
-    Map<String, Object> tags = new HashMap<>();
-    if (fields != null && !(fields.isEmpty())) {
-      List<String> fieldList = Arrays.asList(fields.split(","));
-      fieldList.forEach(f -> {
-        String q =
-            "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
-                + "\" WITH KEY = \"" + f + "\"";
-        Query query = new Query(q);
-        QueryResult queryResult = influxDB.query(query);
-        queryResult.getResults().forEach(res -> {
-          res.getSeries().forEach(series -> {
-            if (!series.getValues().isEmpty()) {
-              String field = series.getValues().get(0).get(0).toString();
-              List<String> values =
-                  series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
-              tags.put(field, values);
-            }
-          });
-        });
-      });
-    }
-
-    return tags;
+    return new DataExplorerInfluxQueryExecutor().getTagValues(measurementId, fields);
   }
 
   private List<DataLakeMeasure> getAllMeasurements() {
     return this.dataExplorerSchemaManagement.getAllMeasurements();
   }
-
-  private Environment getEnvironment() {
-    return Environments.getEnvironment();
-  }
 }
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
index 0a1061ac0d..822d5c9f23 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -18,12 +18,13 @@
 
 package org.apache.streampipes.dataexplorer.influx;
 
-import org.apache.streampipes.commons.environment.Environments;
 import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
 import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
 import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
 import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
+import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
 import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
 import org.apache.streampipes.model.datalake.DataSeries;
 import org.apache.streampipes.model.datalake.SpQueryResult;
 
@@ -32,9 +33,15 @@ import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.streampipes.commons.environment.Environments.getEnvironment;
 
 public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {
 
@@ -142,6 +149,43 @@ public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Q
   }
 
   private String getDatabaseName() {
-    return Environments.getEnvironment().getTsStorageBucket().getValueOrDefault();
+    return getEnvironment().getTsStorageBucket().getValueOrDefault();
+  }
+
+  public Map<String, Object> getTagValues(String measurementId, String fields) {
+    try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
+      Map<String, Object> tags = new HashMap<>();
+      if (fields != null && !(fields.isEmpty())) {
+        List<String> fieldList = Arrays.asList(fields.split(","));
+        fieldList.forEach(f -> {
+          String q =
+              "SHOW TAG VALUES ON \"" + getDatabaseName() + "\" FROM \"" + measurementId
+              + "\" WITH KEY = \"" + f + "\"";
+          Query query = new Query(q);
+          QueryResult queryResult = influxDB.query(query);
+          queryResult.getResults().forEach(res -> {
+            res.getSeries().forEach(series -> {
+              if (!series.getValues().isEmpty()) {
+                String field = series.getValues().get(0).get(0).toString();
+                List<String> values =
+                    series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+                tags.put(field, values);
+              }
+            });
+          });
+        });
+      }
+
+      return tags;
+    }
+  }
+
+  public boolean deleteData(DataLakeMeasure measure) {
+    QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
+
+    return !queryResult.hasError() && (queryResult.getResults() == null || queryResult.getResults()
+                                                                                      .get(0)
+                                                                                      .getError() == null);
+
   }
 }