You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2024/04/25 07:49:17 UTC
(streampipes) branch dev updated: refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor` (#2781)
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f6dfc8735b refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor` (#2781)
f6dfc8735b is described below
commit f6dfc8735bc615497056217a903567e297864147
Author: Tim <50...@users.noreply.github.com>
AuthorDate: Thu Apr 25 09:49:12 2024 +0200
refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor` (#2781)
* refactor: move influx specific implementations to `DataExplorerInfluxQueryExecutor`
* style: fix indentation
* fix: improve result check
---
.../dataexplorer/DataExplorerQueryManagement.java | 59 ++++------------------
.../influx/DataExplorerInfluxQueryExecutor.java | 48 +++++++++++++++++-
2 files changed, 56 insertions(+), 51 deletions(-)
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
index 07c5058b6c..426262f1cb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
@@ -18,33 +18,22 @@
package org.apache.streampipes.dataexplorer;
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
-import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
-import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.query.QueryResultProvider;
import org.apache.streampipes.dataexplorer.query.StreamedQueryResultProvider;
import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-
import java.io.IOException;
import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {
@@ -72,10 +61,11 @@ public class DataExplorerQueryManagement implements IDataExplorerQueryManagement
@Override
public boolean deleteAllData() {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+ var queryExecutor = new DataExplorerInfluxQueryExecutor();
for (DataLakeMeasure measure : allMeasurements) {
- QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
- if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+ boolean success = queryExecutor.deleteData(measure);
+ if (!success) {
return false;
}
}
@@ -85,14 +75,13 @@ public class DataExplorerQueryManagement implements IDataExplorerQueryManagement
@Override
public boolean deleteData(String measurementID) {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
- for (DataLakeMeasure measure : allMeasurements) {
- if (measure.getMeasureName().equals(measurementID)) {
- QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
- return !queryResult.hasError();
- }
- }
- return false;
+ var measureToDeleteOpt = allMeasurements.stream()
+ .filter(measure -> measure.getMeasureName().equals(measurementID))
+ .findFirst();
+
+ return measureToDeleteOpt.filter(measure -> new DataExplorerInfluxQueryExecutor().deleteData(measure))
+ .isPresent();
}
@Override
@@ -105,38 +94,10 @@ public class DataExplorerQueryManagement implements IDataExplorerQueryManagement
@Override
public Map<String, Object> getTagValues(String measurementId,
String fields) {
- InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
- String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
- Map<String, Object> tags = new HashMap<>();
- if (fields != null && !(fields.isEmpty())) {
- List<String> fieldList = Arrays.asList(fields.split(","));
- fieldList.forEach(f -> {
- String q =
- "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
- + "\" WITH KEY = \"" + f + "\"";
- Query query = new Query(q);
- QueryResult queryResult = influxDB.query(query);
- queryResult.getResults().forEach(res -> {
- res.getSeries().forEach(series -> {
- if (!series.getValues().isEmpty()) {
- String field = series.getValues().get(0).get(0).toString();
- List<String> values =
- series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
- tags.put(field, values);
- }
- });
- });
- });
- }
-
- return tags;
+ return new DataExplorerInfluxQueryExecutor().getTagValues(measurementId, fields);
}
private List<DataLakeMeasure> getAllMeasurements() {
return this.dataExplorerSchemaManagement.getAllMeasurements();
}
-
- private Environment getEnvironment() {
- return Environments.getEnvironment();
- }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
index 0a1061ac0d..822d5c9f23 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -18,12 +18,13 @@
package org.apache.streampipes.dataexplorer.influx;
-import org.apache.streampipes.commons.environment.Environments;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
+import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataSeries;
import org.apache.streampipes.model.datalake.SpQueryResult;
@@ -32,9 +33,15 @@ import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static org.apache.streampipes.commons.environment.Environments.getEnvironment;
public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {
@@ -142,6 +149,43 @@ public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Q
}
private String getDatabaseName() {
- return Environments.getEnvironment().getTsStorageBucket().getValueOrDefault();
+ return getEnvironment().getTsStorageBucket().getValueOrDefault();
+ }
+
+ public Map<String, Object> getTagValues(String measurementId, String fields) {
+ try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
+ Map<String, Object> tags = new HashMap<>();
+ if (fields != null && !(fields.isEmpty())) {
+ List<String> fieldList = Arrays.asList(fields.split(","));
+ fieldList.forEach(f -> {
+ String q =
+ "SHOW TAG VALUES ON \"" + getDatabaseName() + "\" FROM \"" + measurementId
+ + "\" WITH KEY = \"" + f + "\"";
+ Query query = new Query(q);
+ QueryResult queryResult = influxDB.query(query);
+ queryResult.getResults().forEach(res -> {
+ res.getSeries().forEach(series -> {
+ if (!series.getValues().isEmpty()) {
+ String field = series.getValues().get(0).get(0).toString();
+ List<String> values =
+ series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+ tags.put(field, values);
+ }
+ });
+ });
+ });
+ }
+
+ return tags;
+ }
+ }
+
+ public boolean deleteData(DataLakeMeasure measure) {
+ QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
+
+ return !queryResult.hasError() && (queryResult.getResults() == null || queryResult.getResults()
+ .get(0)
+ .getError() == null);
+
}
}