You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/12 20:21:39 UTC
[streampipes] 02/03: add checkstyle to streampipes-data-explorer
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch add-checkstyle-configuration
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit f0b1baa4eec156cf5a33bb26ac18d4d8cef293c7
Author: bossenti <bo...@posteo.de>
AuthorDate: Mon Dec 12 21:17:14 2022 +0100
add checkstyle to streampipes-data-explorer
---
streampipes-data-explorer/pom.xml | 13 +-
.../dataexplorer/DataLakeManagementV4.java | 378 +++++++++++----------
.../dataexplorer/DataLakeNoUserManagementV3.java | 20 +-
.../dataexplorer/param/QueryParams.java | 8 +-
.../param/RetentionPolicyQueryParams.java | 22 +-
.../dataexplorer/query/DataExplorerQuery.java | 14 +-
.../query/DataExplorerQueryBuilder.java | 8 +-
.../dataexplorer/query/DeleteDataQuery.java | 1 +
.../query/EditRetentionPolicyQuery.java | 77 +++--
.../query/ParameterizedDataExplorerQuery.java | 6 +-
.../query/ShowRetentionPolicyQuery.java | 46 +--
.../dataexplorer/sdk/DataLakeQueryBuilder.java | 29 +-
.../dataexplorer/sdk/DataLakeQueryOrdering.java | 2 +-
.../dataexplorer/template/QueryTemplates.java | 6 +-
.../dataexplorer/utils/DataExplorerUtils.java | 15 +-
.../dataexplorer/v4/AutoAggregationHandler.java | 23 +-
.../dataexplorer/v4/ProvidedQueryParams.java | 4 +-
.../v4/SupportedDataLakeQueryParameters.java | 36 +-
.../v4/params/DeleteFromStatementParams.java | 12 +-
.../dataexplorer/v4/params/FillParams.java | 21 +-
.../v4/params/GroupingByTagsParams.java | 26 +-
.../v4/params/GroupingByTimeParams.java | 22 +-
.../v4/params/ItemLimitationParams.java | 22 +-
.../dataexplorer/v4/params/OffsetParams.java | 22 +-
.../v4/params/OrderingByTimeParams.java | 22 +-
.../dataexplorer/v4/params/QueryParamsV4.java | 14 +-
.../dataexplorer/v4/params/SelectColumn.java | 41 +--
.../v4/params/SelectFromStatementParams.java | 99 +++---
.../dataexplorer/v4/params/TimeBoundaryParams.java | 32 +-
.../dataexplorer/v4/params/WhereCondition.java | 8 +-
.../v4/params/WhereStatementParams.java | 52 +--
.../dataexplorer/v4/query/DataExplorerQueryV4.java | 287 +++++++++-------
.../dataexplorer/v4/query/QueryBuilder.java | 51 +--
.../dataexplorer/v4/query/QueryResultProvider.java | 4 +-
.../v4/query/StreamedQueryResultProvider.java | 3 +-
.../v4/query/elements/DeleteFromStatement.java | 14 +-
.../v4/query/elements/FillStatement.java | 14 +-
.../v4/query/elements/GroupingByTags.java | 30 +-
.../v4/query/elements/GroupingByTime.java | 14 +-
.../v4/query/elements/ItemLimitation.java | 14 +-
.../dataexplorer/v4/query/elements/Offset.java | 14 +-
.../v4/query/elements/OrderingByTime.java | 14 +-
.../v4/query/elements/QueryElement.java | 18 +-
.../v4/query/elements/SelectFromStatement.java | 2 +-
.../v4/query/elements/TimeBoundary.java | 24 +-
.../query/writer/ConfiguredJsonOutputWriter.java | 3 +-
.../v4/query/writer/ConfiguredOutputWriter.java | 4 +-
.../v4/query/writer/item/ItemGenerator.java | 2 +-
.../dataexplorer/v4/template/QueryTemplatesV4.java | 124 +++----
.../v4/utils/DataLakeManagementUtils.java | 60 +++-
.../dataexplorer/v4/utils/TimeParser.java | 4 +-
.../writer/TestConfiguredCsvOutputWriter.java | 3 +-
.../writer/TestConfiguredJsonOutputWriter.java | 5 +-
.../v4/query/writer/item/TestCsvItemWriter.java | 1 +
.../v4/query/writer/item/TestJsonItemWriter.java | 5 +-
.../apache/streampipes/ps/DataLakeResourceV4.java | 2 +-
56 files changed, 966 insertions(+), 851 deletions(-)
diff --git a/streampipes-data-explorer/pom.xml b/streampipes-data-explorer/pom.xml
index 084864ffa..f03ffaad9 100644
--- a/streampipes-data-explorer/pom.xml
+++ b/streampipes-data-explorer/pom.xml
@@ -17,7 +17,8 @@
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -59,6 +60,12 @@
<artifactId>commons-io</artifactId>
</dependency>
</dependencies>
-
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 11ebdefb3..6928f940e 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -18,7 +18,6 @@
package org.apache.streampipes.dataexplorer;
-import com.google.gson.JsonObject;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
@@ -28,9 +27,9 @@ import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
import org.apache.streampipes.dataexplorer.v4.query.QueryResultProvider;
import org.apache.streampipes.dataexplorer.v4.query.StreamedQueryResultProvider;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
import org.apache.streampipes.model.datalake.DataLakeConfiguration;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
@@ -43,6 +42,8 @@ import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.storage.api.IDataLakeStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import com.google.gson.JsonObject;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
@@ -60,220 +61,229 @@ import java.util.stream.Collectors;
public class DataLakeManagementV4 {
- public List<DataLakeMeasure> getAllMeasurements() {
- return DataExplorerUtils.getInfos();
- }
+ public List<DataLakeMeasure> getAllMeasurements() {
+ return DataExplorerUtils.getInfos();
+ }
- public DataLakeMeasure getById(String measureId) {
- return getDataLakeStorage().findOne(measureId);
- }
+ public DataLakeMeasure getById(String measureId) {
+ return getDataLakeStorage().findOne(measureId);
+ }
- public SpQueryResult getData(ProvidedQueryParams queryParams,
- boolean ignoreMissingData) throws IllegalArgumentException {
- return new QueryResultProvider(queryParams, ignoreMissingData).getData();
- }
+ public SpQueryResult getData(ProvidedQueryParams queryParams,
+ boolean ignoreMissingData) throws IllegalArgumentException {
+ return new QueryResultProvider(queryParams, ignoreMissingData).getData();
+ }
- public void getDataAsStream(ProvidedQueryParams params,
- OutputFormat format,
- boolean ignoreMissingValues,
- OutputStream outputStream) throws IOException {
+ public void getDataAsStream(ProvidedQueryParams params,
+ OutputFormat format,
+ boolean ignoreMissingValues,
+ OutputStream outputStream) throws IOException {
- new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
- }
+ new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
+ }
- public boolean removeAllMeasurements() {
- List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+ public boolean removeAllMeasurements() {
+ List<DataLakeMeasure> allMeasurements = getAllMeasurements();
- for (DataLakeMeasure measure : allMeasurements) {
- QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
- if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
- return false;
- }
- }
- return true;
+ for (DataLakeMeasure measure : allMeasurements) {
+ QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
+ if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+ return false;
+ }
}
+ return true;
+ }
- public boolean removeMeasurement(String measurementID) {
- List<DataLakeMeasure> allMeasurements = getAllMeasurements();
- for (DataLakeMeasure measure : allMeasurements) {
- if (measure.getMeasureName().equals(measurementID)) {
- QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
+ public boolean removeMeasurement(String measurementID) {
+ List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+ for (DataLakeMeasure measure : allMeasurements) {
+ if (measure.getMeasureName().equals(measurementID)) {
+ QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
- return !queryResult.hasError() && queryResult.getResults().get(0).getError() == null;
- }
- }
- return false;
+ return !queryResult.hasError() && queryResult.getResults().get(0).getError() == null;
+ }
}
-
- public SpQueryResult deleteData(String measurementID) {
- return this.deleteData(measurementID, null, null);
+ return false;
+ }
+
+ public SpQueryResult deleteData(String measurementID) {
+ return this.deleteData(measurementID, null, null);
+ }
+
+ public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
+ Map<String, QueryParamsV4> queryParts =
+ DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
+ return new DataExplorerQueryV4(queryParts).executeQuery(true);
+ }
+
+ public DataLakeConfiguration getDataLakeConfiguration() {
+ List<DataLakeRetentionPolicy> retentionPolicies = getAllExistingRetentionPolicies();
+ return new DataLakeConfiguration(retentionPolicies);
+ }
+
+ public String editMeasurementConfiguration(DataLakeConfiguration config, boolean resetToDefault) {
+
+ List<DataLakeRetentionPolicy> existingRetentionPolicies = getAllExistingRetentionPolicies();
+
+ if (resetToDefault) {
+ if (existingRetentionPolicies.size() > 1) {
+ String drop =
+ new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("custom", "0s"), "DROP").executeQuery();
+ }
+ return new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("autogen", "0s"), "DEFAULT").executeQuery();
+ } else {
+
+ Integer batchSize = config.getBatchSize();
+ Integer flushDuration = config.getFlushDuration();
+
+ //
+ // TODO:
+ // - Implementation of parameter update for batchSize and flushDuration
+ // - Updating multiple retention policies
+ //
+
+ String operation = "CREATE";
+ if (existingRetentionPolicies.size() > 1) {
+ operation = "ALTER";
+ }
+ return new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("custom", "1d"), operation).executeQuery();
}
-
- public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
- Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
- return new DataExplorerQueryV4(queryParts).executeQuery(true);
+ }
+
+ public List<DataLakeRetentionPolicy> getAllExistingRetentionPolicies() {
+ //
+ // TODO:
+ // - Implementation of parameter return for batchSize and flushDuration
+ //
+ return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
+ }
+
+ public boolean removeEventProperty(String measurementID) {
+ boolean isSuccess = false;
+ CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+ List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+ for (JsonObject document : docs) {
+ if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
+ couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
+ document.get("_rev").toString().replace("\"", ""));
+ isSuccess = true;
+ break;
+ }
}
- public DataLakeConfiguration getDataLakeConfiguration() {
- List<DataLakeRetentionPolicy> retentionPolicies = getAllExistingRetentionPolicies();
- return new DataLakeConfiguration(retentionPolicies);
+ try {
+ couchDbClient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
}
-
- public String editMeasurementConfiguration(DataLakeConfiguration config, boolean resetToDefault) {
-
- List<DataLakeRetentionPolicy> existingRetentionPolicies = getAllExistingRetentionPolicies();
-
- if (resetToDefault) {
- if (existingRetentionPolicies.size() > 1) {
- String drop = new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("custom", "0s"), "DROP").executeQuery();
+ return isSuccess;
+ }
+
+ public Map<String, Object> getTagValues(String measurementId,
+ String fields) {
+ InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
+ Map<String, Object> tags = new HashMap<>();
+ if (fields != null && !("".equals(fields))) {
+ List<String> fieldList = Arrays.asList(fields.split(","));
+ fieldList.forEach(f -> {
+ String q =
+ "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId
+ + "\" WITH KEY = \"" + f + "\"";
+ Query query = new Query(q);
+ QueryResult queryResult = influxDB.query(query);
+ queryResult.getResults().forEach(res -> {
+ res.getSeries().forEach(series -> {
+ if (series.getValues().size() > 0) {
+ String field = series.getValues().get(0).get(0).toString();
+ List<String> values =
+ series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+ tags.put(field, values);
}
- return new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("autogen", "0s"), "DEFAULT").executeQuery();
- } else {
-
- Integer batchSize = config.getBatchSize();
- Integer flushDuration = config.getFlushDuration();
+ });
+ });
+ });
+ }
- //
- // TODO:
- // - Implementation of parameter update for batchSize and flushDuration
- // - Updating multiple retention policies
- //
+ return tags;
+ }
- String operation = "CREATE";
- if (existingRetentionPolicies.size() > 1) {
- operation = "ALTER";
- }
- return new EditRetentionPolicyQuery(RetentionPolicyQueryParams.from("custom", "1d"), operation).executeQuery();
- }
+ public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
+ var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+ if (existingMeasure != null) {
+ measure.setRev(existingMeasure.getRev());
+ getDataLakeStorage().updateDataLakeMeasure(measure);
+ } else {
+ getDataLakeStorage().storeDataLakeMeasure(measure);
}
+ }
- public List<DataLakeRetentionPolicy> getAllExistingRetentionPolicies() {
- //
- // TODO:
- // - Implementation of parameter return for batchSize and flushDuration
- //
- return new ShowRetentionPolicyQuery(RetentionPolicyQueryParams.from("", "0s")).executeQuery();
+ public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
+ if (getDataLakeStorage().findOne(elementId) != null) {
+ getDataLakeStorage().deleteDataLakeMeasure(elementId);
+ } else {
+ throw new IllegalArgumentException("Could not find measure with this ID");
}
-
- public boolean removeEventProperty(String measurementID) {
- boolean isSuccess = false;
- CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
- List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
-
- for (JsonObject document : docs) {
- if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
- couchDbClient.remove(document.get("_id").toString().replace("\"", ""), document.get("_rev").toString().replace("\"", ""));
- isSuccess = true;
- break;
- }
- }
-
- try {
- couchDbClient.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return isSuccess;
+ }
+
+ public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
+ List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
+ Optional<DataLakeMeasure> optional =
+ dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
+ .findFirst();
+
+ if (optional.isPresent()) {
+ DataLakeMeasure oldEntry = optional.get();
+ if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
+ measure.getEventSchema().getEventProperties())) {
+ return oldEntry;
+ }
+ } else {
+ measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+ getDataLakeStorage().storeDataLakeMeasure(measure);
+ return measure;
}
- public Map<String, Object> getTagValues(String measurementId,
- String fields) {
- InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
- Map<String, Object> tags = new HashMap<>();
- if (fields != null && !("".equals(fields))) {
- List<String> fieldList = Arrays.asList(fields.split(","));
- fieldList.forEach(f -> {
- String q = "SHOW TAG VALUES ON \"" + BackendConfig.INSTANCE.getInfluxDatabaseName() + "\" FROM \"" + measurementId + "\" WITH KEY = \"" + f + "\"";
- Query query = new Query(q);
- QueryResult queryResult = influxDB.query(query);
- queryResult.getResults().forEach(res -> {
- res.getSeries().forEach(series -> {
- if (series.getValues().size() > 0) {
- String field = series.getValues().get(0).get(0).toString();
- List<String> values = series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
- tags.put(field, values);
- }
- });
- });
- });
- }
-
- return tags;
- }
+ return measure;
+ }
- public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
- var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
- if (existingMeasure != null) {
- measure.setRev(existingMeasure.getRev());
- getDataLakeStorage().updateDataLakeMeasure(measure);
- } else {
- getDataLakeStorage().storeDataLakeMeasure(measure);
- }
+ private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
+ if (prop1.size() != prop2.size()) {
+ return false;
}
- public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
- if (getDataLakeStorage().findOne(elementId) != null) {
- getDataLakeStorage().deleteDataLakeMeasure(elementId);
- } else {
- throw new IllegalArgumentException("Could not find measure with this ID");
- }
- }
+ return prop1.stream().allMatch(prop -> {
- public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
- List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
- Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())).findFirst();
+ for (EventProperty property : prop2) {
+ if (prop.getRuntimeName().equals(property.getRuntimeName())) {
- if (optional.isPresent()) {
- DataLakeMeasure oldEntry = optional.get();
- if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(), measure.getEventSchema().getEventProperties())) {
- return oldEntry;
+ //primitive
+ if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
+ if (((EventPropertyPrimitive) prop)
+ .getRuntimeType()
+ .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
+ return true;
}
- } else {
- measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
- getDataLakeStorage().storeDataLakeMeasure(measure);
- return measure;
- }
- return measure;
- }
+ //list
+ } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
+ return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
+ Collections.singletonList(((EventPropertyList) property).getEventProperty()));
- private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
- if (prop1.size() != prop2.size()) {
- return false;
+ //nested
+ } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
+ return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
+ ((EventPropertyNested) property).getEventProperties());
+ }
}
+ }
+ return false;
- return prop1.stream().allMatch(prop -> {
-
- for (EventProperty property : prop2) {
- if (prop.getRuntimeName().equals(property.getRuntimeName())) {
-
- //primitive
- if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
- if (((EventPropertyPrimitive) prop)
- .getRuntimeType()
- .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
- return true;
- }
-
- //list
- } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
- return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
- Collections.singletonList(((EventPropertyList) property).getEventProperty()));
-
- //nested
- } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
- return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
- ((EventPropertyNested) property).getEventProperties());
- }
- }
- }
- return false;
+ });
+ }
- });
- }
-
- private IDataLakeStorage getDataLakeStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
- }
+ private IDataLakeStorage getDataLakeStorage() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
index 5b411a0cf..6fd104ecd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
@@ -19,7 +19,11 @@
package org.apache.streampipes.dataexplorer;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
-import org.apache.streampipes.model.schema.*;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyList;
+import org.apache.streampipes.model.schema.EventPropertyNested;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.storage.api.IDataLakeStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
@@ -35,10 +39,12 @@ public class DataLakeNoUserManagementV3 {
@Deprecated
public boolean addDataLake(String measure, EventSchema eventSchema) {
List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
- Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
+ Optional<DataLakeMeasure> optional =
+ dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
if (optional.isPresent()) {
- if (!compareEventProperties(optional.get().getEventSchema().getEventProperties(), eventSchema.getEventProperties())) {
+ if (!compareEventProperties(optional.get().getEventSchema().getEventProperties(),
+ eventSchema.getEventProperties())) {
return false;
}
} else {
@@ -62,20 +68,20 @@ public class DataLakeNoUserManagementV3 {
//primitive
if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
if (((EventPropertyPrimitive) prop)
- .getRuntimeType()
- .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
+ .getRuntimeType()
+ .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
return true;
}
//list
} else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
- Collections.singletonList(((EventPropertyList) property).getEventProperty()));
+ Collections.singletonList(((EventPropertyList) property).getEventProperty()));
//nested
} else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
- ((EventPropertyNested) property).getEventProperties());
+ ((EventPropertyNested) property).getEventProperties());
}
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/QueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/QueryParams.java
index 76c010b78..d664eb269 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/QueryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/QueryParams.java
@@ -21,14 +21,14 @@ public class QueryParams {
private final String index;
- public static QueryParams from(String index) {
- return new QueryParams(index);
- }
-
protected QueryParams(String index) {
this.index = index;
}
+ public static QueryParams from(String index) {
+ return new QueryParams(index);
+ }
+
public String getIndex() {
return index;
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
index d87a630ea..b42fa6d48 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/RetentionPolicyQueryParams.java
@@ -19,20 +19,20 @@
package org.apache.streampipes.dataexplorer.param;
public class RetentionPolicyQueryParams extends QueryParams {
- private final String durationLiteral;
+ private final String durationLiteral;
- public static RetentionPolicyQueryParams from(String index, String durationLiteral) {
- return new RetentionPolicyQueryParams(index, durationLiteral);
- }
+ protected RetentionPolicyQueryParams(String index, String durationLiteral) {
+ super(index);
+ this.durationLiteral = durationLiteral;
+ }
- protected RetentionPolicyQueryParams(String index, String durationLiteral) {
- super(index);
- this.durationLiteral = durationLiteral;
- }
+ public static RetentionPolicyQueryParams from(String index, String durationLiteral) {
+ return new RetentionPolicyQueryParams(index, durationLiteral);
+ }
- public String getDurationLiteral() {
- return durationLiteral;
- }
+ public String getDurationLiteral() {
+ return durationLiteral;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java
index 10516e866..d34b5bba4 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQuery.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.model.datalake.DataSeries;
import org.apache.streampipes.model.datalake.SpQueryResult;
+
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
@@ -28,21 +29,22 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-public abstract class DataExplorerQuery<OUT> {
+public abstract class DataExplorerQuery<T> {
- public OUT executeQuery() throws RuntimeException {
+ public T executeQuery() throws RuntimeException {
InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
- DataExplorerQueryBuilder queryBuilder = DataExplorerQueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+ DataExplorerQueryBuilder queryBuilder =
+ DataExplorerQueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
getQuery(queryBuilder);
Query query = queryBuilder.toQuery();
org.influxdb.dto.QueryResult result;
if (queryBuilder.hasTimeUnit()) {
- result = influxDB.query(query, queryBuilder.getTimeUnit());;
+ result = influxDB.query(query, queryBuilder.getTimeUnit());
} else {
result = influxDB.query(query);
}
- OUT dataResult = postQuery(result);
+ T dataResult = postQuery(result);
influxDB.close();
return dataResult;
@@ -82,5 +84,5 @@ public abstract class DataExplorerQuery<OUT> {
protected abstract void getQuery(DataExplorerQueryBuilder queryBuilder);
- protected abstract OUT postQuery(org.influxdb.dto.QueryResult result) throws RuntimeException;
+ protected abstract T postQuery(org.influxdb.dto.QueryResult result) throws RuntimeException;
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryBuilder.java
index d393e3ae0..7ca849b68 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryBuilder.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryBuilder.java
@@ -28,15 +28,15 @@ public class DataExplorerQueryBuilder {
private String databaseName;
private TimeUnit timeUnit;
- public static DataExplorerQueryBuilder create(String databaseName) {
- return new DataExplorerQueryBuilder(databaseName);
- }
-
private DataExplorerQueryBuilder(String databaseName) {
this.queryParts = new StringJoiner(" ");
this.databaseName = databaseName;
}
+ public static DataExplorerQueryBuilder create(String databaseName) {
+ return new DataExplorerQueryBuilder(databaseName);
+ }
+
public DataExplorerQueryBuilder add(String queryPart) {
this.queryParts.add(queryPart);
return this;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DeleteDataQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DeleteDataQuery.java
index 4570927d3..e4a7a05b9 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DeleteDataQuery.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DeleteDataQuery.java
@@ -18,6 +18,7 @@
package org.apache.streampipes.dataexplorer.query;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
import org.influxdb.dto.QueryResult;
public class DeleteDataQuery extends DataExplorerQuery<QueryResult> {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
index ec907161f..c28931ad1 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/EditRetentionPolicyQuery.java
@@ -19,56 +19,59 @@
package org.apache.streampipes.dataexplorer.query;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
+
import org.influxdb.dto.QueryResult;
public class EditRetentionPolicyQuery extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, String> {
- private static final String CREATE_OPERATOR = "CREATE";
- private static final String ALTER_OPERATOR = "ALTER";
- private static final String DROP_OPERATOR = "DROP";
- private static final String RESET_OPERATOR = "DEFAULT";
-
- private String operationToPerform;
+ private static final String CREATE_OPERATOR = "CREATE";
+ private static final String ALTER_OPERATOR = "ALTER";
+ private static final String DROP_OPERATOR = "DROP";
+ private static final String RESET_OPERATOR = "DEFAULT";
- public EditRetentionPolicyQuery(RetentionPolicyQueryParams queryParams, String operation) {
- super(queryParams);
- this.operationToPerform = operation;
- }
+ private String operationToPerform;
+ public EditRetentionPolicyQuery(RetentionPolicyQueryParams queryParams, String operation) {
+ super(queryParams);
+ this.operationToPerform = operation;
+ }
- @Override
- protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
- if (this.operationToPerform.equals(CREATE_OPERATOR)) {
- queryBuilder.add(createRetentionPolicyStatement(params.getIndex()));
- } else if (this.operationToPerform.equals(ALTER_OPERATOR)) {
- queryBuilder.add(alterRetentionPolicyStatement(params.getIndex()));
- } else if (this.operationToPerform.equals(DROP_OPERATOR)) {
- queryBuilder.add(dropRetentionPolicyStatement(params.getIndex()));
- } else if (this.operationToPerform.equals(RESET_OPERATOR)) {
- queryBuilder.add(resetRetentionPolicyStatement());
- }
+ @Override
+ protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
+ if (this.operationToPerform.equals(CREATE_OPERATOR)) {
+ queryBuilder.add(createRetentionPolicyStatement(params.getIndex()));
+ } else if (this.operationToPerform.equals(ALTER_OPERATOR)) {
+ queryBuilder.add(alterRetentionPolicyStatement(params.getIndex()));
+ } else if (this.operationToPerform.equals(DROP_OPERATOR)) {
+ queryBuilder.add(dropRetentionPolicyStatement(params.getIndex()));
+ } else if (this.operationToPerform.equals(RESET_OPERATOR)) {
+ queryBuilder.add(resetRetentionPolicyStatement());
}
- @Override
- protected String postQuery(QueryResult result) throws RuntimeException {
- return result.toString();
- }
+ }
- private String createRetentionPolicyStatement(String index) {
- return "CREATE RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral() + " REPLICATION 1 DEFAULT";
- }
+ @Override
+ protected String postQuery(QueryResult result) throws RuntimeException {
+ return result.toString();
+ }
- private String alterRetentionPolicyStatement(String index) {
- return "ALTER RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral() + " REPLICATION 1 DEFAULT";
- }
+ private String createRetentionPolicyStatement(String index) {
+ return "CREATE RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral()
+ + " REPLICATION 1 DEFAULT";
+ }
- private String dropRetentionPolicyStatement(String index) {
- return "DROP RETENTION POLICY " + index + " ON " + "sp";
- }
+ private String alterRetentionPolicyStatement(String index) {
+ return "ALTER RETENTION POLICY " + index + " ON " + "sp DURATION " + params.getDurationLiteral()
+ + " REPLICATION 1 DEFAULT";
+ }
- private String resetRetentionPolicyStatement() {
- return "ALTER RETENTION POLICY " + "autogen" + " ON " + "sp DURATION " + "0s" + " REPLICATION 1 DEFAULT";
- }
+ private String dropRetentionPolicyStatement(String index) {
+ return "DROP RETENTION POLICY " + index + " ON " + "sp";
+ }
+
+ private String resetRetentionPolicyStatement() {
+ return "ALTER RETENTION POLICY " + "autogen" + " ON " + "sp DURATION " + "0s" + " REPLICATION 1 DEFAULT";
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ParameterizedDataExplorerQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ParameterizedDataExplorerQuery.java
index 80391047a..1c66dfd23 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ParameterizedDataExplorerQuery.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ParameterizedDataExplorerQuery.java
@@ -17,11 +17,11 @@
*/
package org.apache.streampipes.dataexplorer.query;
-public abstract class ParameterizedDataExplorerQuery<P, OUT> extends DataExplorerQuery<OUT> {
+public abstract class ParameterizedDataExplorerQuery<K, V> extends DataExplorerQuery<V> {
- protected P params;
+ protected K params;
- public ParameterizedDataExplorerQuery(P queryParams) {
+ public ParameterizedDataExplorerQuery(K queryParams) {
this.params = queryParams;
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
index d169c877d..4ccbb19f4 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/ShowRetentionPolicyQuery.java
@@ -20,37 +20,39 @@ package org.apache.streampipes.dataexplorer.query;
import org.apache.streampipes.dataexplorer.param.RetentionPolicyQueryParams;
import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
+
import org.influxdb.dto.QueryResult;
import java.util.ArrayList;
import java.util.List;
-public class ShowRetentionPolicyQuery extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, List<DataLakeRetentionPolicy>> {
+public class ShowRetentionPolicyQuery
+ extends ParameterizedDataExplorerQuery<RetentionPolicyQueryParams, List<DataLakeRetentionPolicy>> {
- public ShowRetentionPolicyQuery(RetentionPolicyQueryParams queryParams) {
- super(queryParams);
- }
+ public ShowRetentionPolicyQuery(RetentionPolicyQueryParams queryParams) {
+ super(queryParams);
+ }
- @Override
- protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
- queryBuilder.add(showRetentionPolicyStatement());
- }
+ @Override
+ protected void getQuery(DataExplorerQueryBuilder queryBuilder) {
+ queryBuilder.add(showRetentionPolicyStatement());
+ }
- @Override
- protected List<DataLakeRetentionPolicy> postQuery(QueryResult result) throws RuntimeException {
- List<DataLakeRetentionPolicy> policies = new ArrayList<>();
- for (List<Object> a : result.getResults().get(0).getSeries().get(0).getValues()) {
- boolean isDefault = false;
- if (a.get(4).toString().equals("true")) {
- isDefault = true;
- }
- policies.add(new DataLakeRetentionPolicy(a.get(0).toString(), a.get(1).toString(), isDefault));
- }
- return policies;
+ @Override
+ protected List<DataLakeRetentionPolicy> postQuery(QueryResult result) throws RuntimeException {
+ List<DataLakeRetentionPolicy> policies = new ArrayList<>();
+ for (List<Object> a : result.getResults().get(0).getSeries().get(0).getValues()) {
+ boolean isDefault = false;
+ if (a.get(4).toString().equals("true")) {
+ isDefault = true;
+ }
+ policies.add(new DataLakeRetentionPolicy(a.get(0).toString(), a.get(1).toString(), isDefault));
}
+ return policies;
+ }
- private String showRetentionPolicyStatement() {
- return "SHOW RETENTION POLICIES ON " + "sp";
- }
+ private String showRetentionPolicyStatement() {
+ return "SHOW RETENTION POLICIES ON " + "sp";
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
index 54ba84e12..44310bb71 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
@@ -20,15 +20,23 @@ package org.apache.streampipes.dataexplorer.sdk;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction;
+
import org.influxdb.dto.Query;
import org.influxdb.querybuilder.Ordering;
import org.influxdb.querybuilder.SelectionQueryImpl;
-import org.influxdb.querybuilder.clauses.*;
+import org.influxdb.querybuilder.clauses.Clause;
+import org.influxdb.querybuilder.clauses.ConjunctionClause;
+import org.influxdb.querybuilder.clauses.NestedClause;
+import org.influxdb.querybuilder.clauses.OrConjunction;
+import org.influxdb.querybuilder.clauses.RawTextClause;
+import org.influxdb.querybuilder.clauses.SimpleClause;
import java.util.ArrayList;
import java.util.List;
-import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.*;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.asc;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.desc;
+import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
public class DataLakeQueryBuilder {
@@ -40,10 +48,6 @@ public class DataLakeQueryBuilder {
private int limit = Integer.MIN_VALUE;
private int offset = Integer.MIN_VALUE;
- public static DataLakeQueryBuilder create(String measurementId) {
- return new DataLakeQueryBuilder(measurementId);
- }
-
private DataLakeQueryBuilder(String measurementId) {
this.measurementId = measurementId;
this.selectionQuery = select();
@@ -51,6 +55,10 @@ public class DataLakeQueryBuilder {
this.groupByClauses = new ArrayList<>();
}
+ public static DataLakeQueryBuilder create(String measurementId) {
+ return new DataLakeQueryBuilder(measurementId);
+ }
+
public DataLakeQueryBuilder withSimpleColumn(String columnName) {
this.selectionQuery.column(columnName);
@@ -153,9 +161,9 @@ public class DataLakeQueryBuilder {
public DataLakeQueryBuilder withOrderBy(DataLakeQueryOrdering ordering) {
if (DataLakeQueryOrdering.ASC.equals(ordering)) {
- this.ordering = asc();
+ this.ordering = asc();
} else {
- this.ordering = desc();
+ this.ordering = desc();
}
return this;
@@ -174,7 +182,8 @@ public class DataLakeQueryBuilder {
}
public Query build() {
- var selectQuery = this.selectionQuery.from(BackendConfig.INSTANCE.getInfluxDatabaseName(), "\"" +measurementId + "\"");
+ var selectQuery =
+ this.selectionQuery.from(BackendConfig.INSTANCE.getInfluxDatabaseName(), "\"" + measurementId + "\"");
this.whereClauses.forEach(selectQuery::where);
if (this.groupByClauses.size() > 0) {
@@ -182,7 +191,7 @@ public class DataLakeQueryBuilder {
}
if (this.ordering != null) {
- selectQuery.orderBy(this.ordering);
+ selectQuery.orderBy(this.ordering);
}
if (this.limit != Integer.MIN_VALUE) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
index 6952d0dcc..e8734e78e 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
@@ -19,5 +19,5 @@
package org.apache.streampipes.dataexplorer.sdk;
public enum DataLakeQueryOrdering {
- ASC, DESC
+ ASC, DESC
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/template/QueryTemplates.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/template/QueryTemplates.java
index 082535d70..80ff5d1fa 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/template/QueryTemplates.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/template/QueryTemplates.java
@@ -33,8 +33,8 @@ public class QueryTemplates {
public static String whereTimeWithin(long startDate, long endDate) {
return "WHERE time > "
- + startDate * 1000000
- + " AND time < "
- + endDate * 1000000;
+ + startDate * 1000000
+ + " AND time < "
+ + endDate * 1000000;
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/DataExplorerUtils.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/DataExplorerUtils.java
index 85ffd9717..31061ec66 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/DataExplorerUtils.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/DataExplorerUtils.java
@@ -17,10 +17,11 @@
*/
package org.apache.streampipes.dataexplorer.utils;
-import okhttp3.OkHttpClient;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.storage.management.StorageDispatcher;
+
+import okhttp3.OkHttpClient;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
@@ -31,16 +32,16 @@ public class DataExplorerUtils {
public static List<DataLakeMeasure> getInfos() {
return StorageDispatcher.INSTANCE
- .getNoSqlStore()
- .getDataLakeStorage()
- .getAllDataLakeMeasures();
+ .getNoSqlStore()
+ .getDataLakeStorage()
+ .getAllDataLakeMeasures();
}
public static InfluxDB getInfluxDBClient() {
OkHttpClient.Builder okHttpClientBuilder = new OkHttpClient().newBuilder()
- .connectTimeout(120, TimeUnit.SECONDS)
- .readTimeout(120, TimeUnit.SECONDS)
- .writeTimeout(120, TimeUnit.SECONDS);
+ .connectTimeout(120, TimeUnit.SECONDS)
+ .readTimeout(120, TimeUnit.SECONDS)
+ .writeTimeout(120, TimeUnit.SECONDS);
return InfluxDBFactory.connect(BackendConfig.INSTANCE.getInfluxUrl(), okHttpClientBuilder);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
index d4f1568cb..63d9bd9fa 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
@@ -21,6 +21,7 @@ import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
import org.apache.streampipes.dataexplorer.model.Order;
import org.apache.streampipes.dataexplorer.v4.params.SelectColumn;
import org.apache.streampipes.model.datalake.SpQueryResult;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +32,13 @@ import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.*;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
public class AutoAggregationHandler {
@@ -73,15 +80,16 @@ public class AutoAggregationHandler {
} else {
return disableAutoAgg(this.queryParams);
}
- } catch(ParseException e){
- e.printStackTrace();
- }
+ } catch (ParseException e) {
+ e.printStackTrace();
+ }
return null;
}
private void checkAllArgumentsPresent() throws IllegalArgumentException {
if (!this.queryParams.has(QP_AGGREGATION_FUNCTION)) {
- throw new IllegalArgumentException("Auto-Aggregate must provide one of the aggregationFunction parameters MEAN, FIRST, LAST.");
+ throw new IllegalArgumentException(
+ "Auto-Aggregate must provide one of the aggregationFunction parameters MEAN, FIRST, LAST.");
}
}
@@ -98,7 +106,7 @@ public class AutoAggregationHandler {
countParams.update(QP_COLUMNS, fieldName);
SpQueryResult result = new DataLakeManagementV4().getData(countParams, true);
-
+
return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0;
}
@@ -123,7 +131,8 @@ public class AutoAggregationHandler {
}
private String transformColumns(String rawQuery) {
- List<SelectColumn> columns = Arrays.stream(rawQuery.split(COMMA)).map(SelectColumn::fromApiQueryString).collect(Collectors.toList());
+ List<SelectColumn> columns =
+ Arrays.stream(rawQuery.split(COMMA)).map(SelectColumn::fromApiQueryString).collect(Collectors.toList());
return columns.stream().map(SelectColumn::getOriginalField).collect(Collectors.joining(COMMA));
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
index 7487ad94c..e3c9535d6 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
@@ -46,11 +46,11 @@ public class ProvidedQueryParams {
}
public Integer getAsInt(String key) {
- return has(key) ? Integer.parseInt(providedParams.get(key)): null;
+ return has(key) ? Integer.parseInt(providedParams.get(key)) : null;
}
public String getAsString(String key) {
- return has(key) ? providedParams.get(key): null;
+ return has(key) ? providedParams.get(key) : null;
}
public boolean getAsBoolean(String key) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
index e124253d8..059a4f4ce 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
@@ -41,24 +41,24 @@ public class SupportedDataLakeQueryParameters {
public static final String QP_FILTER = "filter";
public static final String QP_MAXIMUM_AMOUNT_OF_EVENTS = "maximumAmountOfEvents";
- public static final List<String> supportedParams = Arrays.asList(
- QP_COLUMNS,
- QP_START_DATE,
- QP_END_DATE,
- QP_PAGE,
- QP_LIMIT,
- QP_OFFSET,
- QP_GROUP_BY,
- QP_ORDER,
- QP_AGGREGATION_FUNCTION,
- QP_TIME_INTERVAL,
- QP_FORMAT,
- QP_CSV_DELIMITER,
- QP_COUNT_ONLY,
- QP_AUTO_AGGREGATE,
- QP_MISSING_VALUE_BEHAVIOUR,
- QP_FILTER,
- QP_MAXIMUM_AMOUNT_OF_EVENTS
+ public static final List<String> SUPPORTED_PARAMS = Arrays.asList(
+ QP_COLUMNS,
+ QP_START_DATE,
+ QP_END_DATE,
+ QP_PAGE,
+ QP_LIMIT,
+ QP_OFFSET,
+ QP_GROUP_BY,
+ QP_ORDER,
+ QP_AGGREGATION_FUNCTION,
+ QP_TIME_INTERVAL,
+ QP_FORMAT,
+ QP_CSV_DELIMITER,
+ QP_COUNT_ONLY,
+ QP_AUTO_AGGREGATE,
+ QP_MISSING_VALUE_BEHAVIOUR,
+ QP_FILTER,
+ QP_MAXIMUM_AMOUNT_OF_EVENTS
);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
index df9334760..c3cc0392c 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
@@ -20,12 +20,12 @@ package org.apache.streampipes.dataexplorer.v4.params;
public class DeleteFromStatementParams extends QueryParamsV4 {
- public static DeleteFromStatementParams from(String measurementID) {
- return new DeleteFromStatementParams(measurementID);
- }
+ public DeleteFromStatementParams(String measurementID) {
+ super(measurementID);
+ }
- public DeleteFromStatementParams(String measurementID) {
- super(measurementID);
- }
+ public static DeleteFromStatementParams from(String measurementID) {
+ return new DeleteFromStatementParams(measurementID);
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
index b415d8376..0831e6d5b 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
@@ -19,16 +19,17 @@
package org.apache.streampipes.dataexplorer.v4.params;
public class FillParams extends QueryParamsV4 {
- String fill = "fill(none)";
+ String fill = "fill(none)";
- public static FillParams from(String measurementID) {
- return new FillParams(measurementID);
- }
- protected FillParams(String index) {
- super(index);
- }
+ protected FillParams(String index) {
+ super(index);
+ }
- public String getFill() {
- return fill;
- }
+ public static FillParams from(String measurementID) {
+ return new FillParams(measurementID);
+ }
+
+ public String getFill() {
+ return fill;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
index bcbb02b17..afdc591a6 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
@@ -22,21 +22,21 @@ import java.util.ArrayList;
import java.util.List;
public class GroupingByTagsParams extends QueryParamsV4 {
- private final List<String> groupingTags;
+ private final List<String> groupingTags;
- public static GroupingByTagsParams from(String measurementID, String groupingTagsSeparatedByComma) {
- return new GroupingByTagsParams(measurementID, groupingTagsSeparatedByComma);
+ public GroupingByTagsParams(String measurementID, String groupingTagsSeparatedByComma) {
+ super(measurementID);
+ this.groupingTags = new ArrayList<>();
+ for (String tag : groupingTagsSeparatedByComma.split(",")) {
+ this.groupingTags.add(tag);
}
+ }
- public GroupingByTagsParams(String measurementID, String groupingTagsSeparatedByComma) {
- super(measurementID);
- this.groupingTags = new ArrayList<>();
- for (String tag : groupingTagsSeparatedByComma.split(",")) {
- this.groupingTags.add(tag);
- }
- }
+ public static GroupingByTagsParams from(String measurementID, String groupingTagsSeparatedByComma) {
+ return new GroupingByTagsParams(measurementID, groupingTagsSeparatedByComma);
+ }
- public List<String> getGroupingTags() {
- return groupingTags;
- }
+ public List<String> getGroupingTags() {
+ return groupingTags;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
index 1751bcc53..27650dabb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
@@ -19,18 +19,18 @@
package org.apache.streampipes.dataexplorer.v4.params;
public class GroupingByTimeParams extends QueryParamsV4 {
- private final String timeInterval;
+ private final String timeInterval;
- public static GroupingByTimeParams from(String measurementID, String timeInterval) {
- return new GroupingByTimeParams(measurementID, timeInterval);
- }
+ public GroupingByTimeParams(String measurementID, String timeInterval) {
+ super(measurementID);
+ this.timeInterval = timeInterval;
+ }
- public GroupingByTimeParams(String measurementID, String timeInterval) {
- super(measurementID);
- this.timeInterval = timeInterval;
- }
+ public static GroupingByTimeParams from(String measurementID, String timeInterval) {
+ return new GroupingByTimeParams(measurementID, timeInterval);
+ }
- public String getTimeInterval() {
- return this.timeInterval;
- }
+ public String getTimeInterval() {
+ return this.timeInterval;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
index 6656ad5a4..0bd9dd122 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
@@ -20,18 +20,18 @@ package org.apache.streampipes.dataexplorer.v4.params;
public class ItemLimitationParams extends QueryParamsV4 {
- private final Integer limit;
+ private final Integer limit;
- public static ItemLimitationParams from(String measurementID, Integer limit) {
- return new ItemLimitationParams(measurementID, limit);
- }
+ public ItemLimitationParams(String measurementID, Integer limit) {
+ super(measurementID);
+ this.limit = limit;
+ }
- public ItemLimitationParams(String measurementID, Integer limit) {
- super(measurementID);
- this.limit = limit;
- }
+ public static ItemLimitationParams from(String measurementID, Integer limit) {
+ return new ItemLimitationParams(measurementID, limit);
+ }
- public Integer getLimit() {
- return limit;
- }
+ public Integer getLimit() {
+ return limit;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
index ac6a1b3c4..334983af1 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
@@ -20,18 +20,18 @@ package org.apache.streampipes.dataexplorer.v4.params;
public class OffsetParams extends QueryParamsV4 {
- private final Integer offset;
+ private final Integer offset;
- public static OffsetParams from(String measurementID, Integer offset) {
- return new OffsetParams(measurementID, offset);
- }
+ public OffsetParams(String measurementID, Integer offset) {
+ super(measurementID);
+ this.offset = offset;
+ }
- public OffsetParams(String measurementID, Integer offset) {
- super(measurementID);
- this.offset = offset;
- }
+ public static OffsetParams from(String measurementID, Integer offset) {
+ return new OffsetParams(measurementID, offset);
+ }
- public Integer getOffset() {
- return offset;
- }
+ public Integer getOffset() {
+ return offset;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
index b7b71e9bb..2115e2f44 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
@@ -19,18 +19,18 @@
package org.apache.streampipes.dataexplorer.v4.params;
public class OrderingByTimeParams extends QueryParamsV4 {
- private final String ordering;
+ private final String ordering;
- public static OrderingByTimeParams from(String measurementID, String ordering) {
- return new OrderingByTimeParams(measurementID, ordering);
- }
+ public OrderingByTimeParams(String measurementID, String ordering) {
+ super(measurementID);
+ this.ordering = ordering;
+ }
- public OrderingByTimeParams(String measurementID, String ordering) {
- super(measurementID);
- this.ordering = ordering;
- }
+ public static OrderingByTimeParams from(String measurementID, String ordering) {
+ return new OrderingByTimeParams(measurementID, ordering);
+ }
- public String getOrdering() {
- return this.ordering;
- }
+ public String getOrdering() {
+ return this.ordering;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
index 8ce7758b9..49046beec 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
@@ -19,14 +19,14 @@ package org.apache.streampipes.dataexplorer.v4.params;
public abstract class QueryParamsV4 {
- private final String index;
+ private final String index;
- protected QueryParamsV4(String index) {
- this.index = index;
- }
+ protected QueryParamsV4(String index) {
+ this.index = index;
+ }
- public String getIndex() {
- return index;
- }
+ public String getIndex() {
+ return index;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
index d2d0ba929..667bf943d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
@@ -28,6 +28,25 @@ public class SelectColumn {
private boolean simpleField = true;
private boolean rename = false;
+ public SelectColumn(String originalField) {
+ this.originalField = originalField;
+ }
+
+ public SelectColumn(String originalField,
+ ColumnFunction columnFunction) {
+ this(originalField);
+ this.columnFunction = columnFunction;
+ this.simpleField = false;
+ }
+
+ public SelectColumn(String originalField,
+ ColumnFunction columnFunction,
+ String targetField) {
+ this(originalField, columnFunction);
+ this.targetField = targetField;
+ this.rename = true;
+ }
+
public static SelectColumn fromApiQueryString(String queryString) {
if (queryString.contains(";")) {
String[] queryParts = DataLakeManagementUtils.buildSingleCondition(queryString);
@@ -35,7 +54,8 @@ public class SelectColumn {
throw new IllegalArgumentException("Wrong query format for query part " + queryString);
} else {
ColumnFunction columnFunction = ColumnFunction.valueOf(queryParts[1]);
- String targetField = queryParts.length == 3 ? queryParts[2] : columnFunction.name().toLowerCase() + "_" +queryParts[0];
+ String targetField =
+ queryParts.length == 3 ? queryParts[2] : columnFunction.name().toLowerCase() + "_" + queryParts[0];
return new SelectColumn(queryParts[0], columnFunction, targetField);
}
} else {
@@ -50,25 +70,6 @@ public class SelectColumn {
return new SelectColumn(queryString, ColumnFunction.valueOf(globalAggregationFunction), targetField);
}
- public SelectColumn(String originalField) {
- this.originalField = originalField;
- }
-
- public SelectColumn(String originalField,
- ColumnFunction columnFunction) {
- this(originalField);
- this.columnFunction = columnFunction;
- this.simpleField = false;
- }
-
- public SelectColumn(String originalField,
- ColumnFunction columnFunction,
- String targetField) {
- this(originalField, columnFunction);
- this.targetField = targetField;
- this.rename = true;
- }
-
private String makeField() {
if (this.simpleField) {
return "\"" + this.originalField + "\"";
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
index 022d1aafc..44cec5afd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
@@ -19,70 +19,73 @@
package org.apache.streampipes.dataexplorer.v4.params;
import javax.annotation.Nullable;
+
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class SelectFromStatementParams extends QueryParamsV4 {
- private List<SelectColumn> selectedColumns;
- private boolean selectWildcard = false;
+ private List<SelectColumn> selectedColumns;
+ private boolean selectWildcard = false;
- public static SelectFromStatementParams from(String measurementID,
- @Nullable String columns,
- @Nullable String aggregationFunction) {
- return new SelectFromStatementParams(measurementID, columns, aggregationFunction);
- }
+ public SelectFromStatementParams(String measurementID) {
+ super(measurementID);
+ this.selectWildcard = true;
+ //this.selectedColumns = "*";
+ }
- public static SelectFromStatementParams from(String measurementId,
- String columns,
- boolean countOnly) {
- return new SelectFromStatementParams(measurementId, columns, countOnly);
- }
+ public SelectFromStatementParams(String measurementId,
+ String columns,
+ boolean countOnly) {
+ this(measurementId);
+ this.selectWildcard = false;
+ this.selectedColumns = countOnly ? buildColumns(columns, ColumnFunction.COUNT.name()) : buildColumns(columns);
+ }
- public SelectFromStatementParams(String measurementID) {
- super(measurementID);
- this.selectWildcard = true;
- //this.selectedColumns = "*";
- }
+ public SelectFromStatementParams(String measurementID,
+ String columns,
+ String aggregationFunction) {
+ super(measurementID);
- public SelectFromStatementParams(String measurementId,
- String columns,
- boolean countOnly) {
- this(measurementId);
- this.selectWildcard = false;
- this.selectedColumns = countOnly ? buildColumns(columns, ColumnFunction.COUNT.name()) : buildColumns(columns);
+ if (columns != null) {
+ this.selectedColumns =
+ aggregationFunction != null ? buildColumns(columns, aggregationFunction) : buildColumns(columns);
+ } else {
+ this.selectWildcard = true;
}
+ }
- public SelectFromStatementParams(String measurementID,
- String columns,
- String aggregationFunction) {
- super(measurementID);
+ public static SelectFromStatementParams from(String measurementID,
+ @Nullable String columns,
+ @Nullable String aggregationFunction) {
+ return new SelectFromStatementParams(measurementID, columns, aggregationFunction);
+ }
- if (columns != null) {
- this.selectedColumns = aggregationFunction != null ? buildColumns(columns, aggregationFunction) : buildColumns(columns);
- } else {
- this.selectWildcard = true;
- }
- }
+ public static SelectFromStatementParams from(String measurementId,
+ String columns,
+ boolean countOnly) {
+ return new SelectFromStatementParams(measurementId, columns, countOnly);
+ }
- public List<SelectColumn> getSelectedColumns() {
- return selectedColumns;
- }
+ public List<SelectColumn> getSelectedColumns() {
+ return selectedColumns;
+ }
- //public String getAggregationFunction() {
- //return aggregationFunction;
- //}
+ //public String getAggregationFunction() {
+ //return aggregationFunction;
+ //}
- public boolean isSelectWildcard() {
- return selectWildcard;
- }
+ public boolean isSelectWildcard() {
+ return selectWildcard;
+ }
- private List<SelectColumn> buildColumns(String rawQuery) {
- return Arrays.stream(rawQuery.split(",")).map(SelectColumn::fromApiQueryString).collect(Collectors.toList());
- }
+ private List<SelectColumn> buildColumns(String rawQuery) {
+ return Arrays.stream(rawQuery.split(",")).map(SelectColumn::fromApiQueryString).collect(Collectors.toList());
+ }
- private List<SelectColumn> buildColumns(String rawQuery, String globalAggregationFunction) {
- return Arrays.stream(rawQuery.split(",")).map(qp -> SelectColumn.fromApiQueryString(qp, globalAggregationFunction)).collect(Collectors.toList());
- }
+ private List<SelectColumn> buildColumns(String rawQuery, String globalAggregationFunction) {
+ return Arrays.stream(rawQuery.split(",")).map(qp -> SelectColumn.fromApiQueryString(qp, globalAggregationFunction))
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
index c89382997..64c0a4867 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
@@ -20,24 +20,24 @@ package org.apache.streampipes.dataexplorer.v4.params;
public class TimeBoundaryParams extends QueryParamsV4 {
- private final Long startDate;
- private final Long endDate;
+ private final Long startDate;
+ private final Long endDate;
- public static TimeBoundaryParams from(String measurementID, Long startDate, Long endDate) {
- return new TimeBoundaryParams(measurementID, startDate, endDate);
- }
+ protected TimeBoundaryParams(String measurementID, Long startDate, Long endDate) {
+ super(measurementID);
+ this.startDate = startDate;
+ this.endDate = endDate;
+ }
- protected TimeBoundaryParams(String measurementID, Long startDate, Long endDate) {
- super(measurementID);
- this.startDate = startDate;
- this.endDate = endDate;
- }
+ public static TimeBoundaryParams from(String measurementID, Long startDate, Long endDate) {
+ return new TimeBoundaryParams(measurementID, startDate, endDate);
+ }
- public Long getStartDate() {
- return startDate;
- }
+ public Long getStartDate() {
+ return startDate;
+ }
- public Long getEndDate() {
- return endDate;
- }
+ public Long getEndDate() {
+ return endDate;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
index fafac95ce..b222eb335 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
@@ -36,10 +36,10 @@ public class WhereCondition {
StringJoiner joiner = new StringJoiner(" ");
return joiner
- .add(field)
- .add(operator)
- .add(condition)
- .toString();
+ .add(field)
+ .add(operator)
+ .add(condition)
+ .toString();
}
public String getField() {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
index a0af420b1..b5ddcf16d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
@@ -17,9 +17,10 @@
*/
package org.apache.streampipes.dataexplorer.v4.params;
-import org.apache.commons.lang3.math.NumberUtils;
import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+
import java.util.ArrayList;
import java.util.List;
@@ -30,28 +31,10 @@ public class WhereStatementParams extends QueryParamsV4 {
private List<WhereCondition> whereConditions;
- public static WhereStatementParams from(String measurementId,
- Long startTime,
- Long endTime) {
- return new WhereStatementParams(measurementId, startTime, endTime);
- }
-
- public static WhereStatementParams from(String measurementId,
- String whereConditions) {
- return new WhereStatementParams(measurementId, whereConditions);
- }
-
- public static WhereStatementParams from(String measurementId,
- Long startTime,
- Long endTime,
- String whereConditions) {
- return new WhereStatementParams(measurementId, startTime, endTime, whereConditions);
- }
-
private WhereStatementParams(String index,
- Long startTime,
- Long endTime,
- String whereConditions) {
+ Long startTime,
+ Long endTime,
+ String whereConditions) {
this(index, startTime, endTime);
if (whereConditions != null) {
buildConditions(whereConditions);
@@ -59,8 +42,8 @@ public class WhereStatementParams extends QueryParamsV4 {
}
private WhereStatementParams(String index,
- Long startTime,
- Long endTime) {
+ Long startTime,
+ Long endTime) {
super(index);
this.whereConditions = new ArrayList<>();
this.buildTimeConditions(startTime, endTime);
@@ -75,6 +58,24 @@ public class WhereStatementParams extends QueryParamsV4 {
}
}
+ public static WhereStatementParams from(String measurementId,
+ Long startTime,
+ Long endTime) {
+ return new WhereStatementParams(measurementId, startTime, endTime);
+ }
+
+ public static WhereStatementParams from(String measurementId,
+ String whereConditions) {
+ return new WhereStatementParams(measurementId, whereConditions);
+ }
+
+ public static WhereStatementParams from(String measurementId,
+ Long startTime,
+ Long endTime,
+ String whereConditions) {
+ return new WhereStatementParams(measurementId, startTime, endTime, whereConditions);
+ }
+
private void buildTimeConditions(Long startTime,
Long endTime) {
if (startTime == null) {
@@ -96,7 +97,8 @@ public class WhereStatementParams extends QueryParamsV4 {
// Add single quotes to strings except for true and false
whereParts.forEach(singleCondition -> {
- this.whereConditions.add(new WhereCondition(singleCondition[0], singleCondition[1], this.returnCondition(singleCondition[2])));
+ this.whereConditions.add(
+ new WhereCondition(singleCondition[0], singleCondition[1], this.returnCondition(singleCondition[2])));
});
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
index 7ecfc0cb8..7d4af4071 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
@@ -20,12 +20,31 @@ package org.apache.streampipes.dataexplorer.v4.query;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.params.*;
-import org.apache.streampipes.dataexplorer.v4.query.elements.*;
+import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.params.FillParams;
+import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
+import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
+import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
+import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
+import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
+import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
+import org.apache.streampipes.dataexplorer.v4.query.elements.DeleteFromStatement;
+import org.apache.streampipes.dataexplorer.v4.query.elements.FillStatement;
+import org.apache.streampipes.dataexplorer.v4.query.elements.GroupingByTags;
+import org.apache.streampipes.dataexplorer.v4.query.elements.GroupingByTime;
+import org.apache.streampipes.dataexplorer.v4.query.elements.ItemLimitation;
+import org.apache.streampipes.dataexplorer.v4.query.elements.Offset;
+import org.apache.streampipes.dataexplorer.v4.query.elements.OrderingByTime;
+import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
+import org.apache.streampipes.dataexplorer.v4.query.elements.SelectFromStatement;
+import org.apache.streampipes.dataexplorer.v4.query.elements.WhereStatement;
import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
import org.apache.streampipes.model.datalake.DataSeries;
import org.apache.streampipes.model.datalake.SpQueryResult;
import org.apache.streampipes.model.datalake.SpQueryStatus;
+
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
@@ -38,172 +57,178 @@ import java.util.Map;
public class DataExplorerQueryV4 {
- private static final Logger LOG = LoggerFactory.getLogger(DataExplorerQueryV4.class);
-
- protected Map<String, QueryParamsV4> params;
-
- protected int maximumAmountOfEvents;
-
- private boolean appendId = false;
- private String forId;
+ private static final Logger LOG = LoggerFactory.getLogger(DataExplorerQueryV4.class);
- public DataExplorerQueryV4() {
+ protected Map<String, QueryParamsV4> params;
- }
+ protected int maximumAmountOfEvents;
- public DataExplorerQueryV4(Map<String, QueryParamsV4> params,
- String forId) {
- this(params);
- this.appendId = true;
- this.forId = forId;
- }
+ private boolean appendId = false;
+ private String forId;
- public DataExplorerQueryV4(Map<String, QueryParamsV4> params) {
- this.params = params;
- this.maximumAmountOfEvents = -1;
- }
+ public DataExplorerQueryV4() {
- public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountOfEvents) {
- this.params = params;
- this.maximumAmountOfEvents = maximumAmountOfEvents;
- }
+ }
- public SpQueryResult executeQuery(boolean ignoreMissingValues) throws RuntimeException {
- InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
- List<QueryElement<?>> queryElements = getQueryElements();
-
- if (this.maximumAmountOfEvents != -1) {
- QueryBuilder countQueryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
- Query countQuery = countQueryBuilder.build(queryElements, true);
- QueryResult countQueryResult = influxDB.query(countQuery);
- Double amountOfQueryResults = getAmountOfResults(countQueryResult);
- if (amountOfQueryResults > this.maximumAmountOfEvents) {
- SpQueryResult tooMuchData = new SpQueryResult();
- tooMuchData.setSpQueryStatus(SpQueryStatus.TOO_MUCH_DATA);
- tooMuchData.setTotal(amountOfQueryResults.intValue());
- influxDB.close();
- return tooMuchData;
- }
- }
+ public DataExplorerQueryV4(Map<String, QueryParamsV4> params,
+ String forId) {
+ this(params);
+ this.appendId = true;
+ this.forId = forId;
+ }
- QueryBuilder queryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
- Query query = queryBuilder.build(queryElements, false);
- LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand());
+ public DataExplorerQueryV4(Map<String, QueryParamsV4> params) {
+ this.params = params;
+ this.maximumAmountOfEvents = -1;
+ }
- QueryResult result = influxDB.query(query);
- LOG.debug("Data Lake Query Result: " + result.toString());
+ public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountOfEvents) {
+ this.params = params;
+ this.maximumAmountOfEvents = maximumAmountOfEvents;
+ }
- SpQueryResult dataResult = postQuery(result, ignoreMissingValues);
+ public SpQueryResult executeQuery(boolean ignoreMissingValues) throws RuntimeException {
+ InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
+ List<QueryElement<?>> queryElements = getQueryElements();
+ if (this.maximumAmountOfEvents != -1) {
+ QueryBuilder countQueryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+ Query countQuery = countQueryBuilder.build(queryElements, true);
+ QueryResult countQueryResult = influxDB.query(countQuery);
+ Double amountOfQueryResults = getAmountOfResults(countQueryResult);
+ if (amountOfQueryResults > this.maximumAmountOfEvents) {
+ SpQueryResult tooMuchData = new SpQueryResult();
+ tooMuchData.setSpQueryStatus(SpQueryStatus.TOO_MUCH_DATA);
+ tooMuchData.setTotal(amountOfQueryResults.intValue());
influxDB.close();
- return dataResult;
+ return tooMuchData;
+ }
}
- public SpQueryResult executeQuery(Query query, boolean ignoreMissingValues) {
- InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
- var dataResult = executeQuery(influxDB, query, ignoreMissingValues);
- influxDB.close();
+ QueryBuilder queryBuilder = QueryBuilder.create(BackendConfig.INSTANCE.getInfluxDatabaseName());
+ Query query = queryBuilder.build(queryElements, false);
+ LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand());
- return dataResult;
- }
+ QueryResult result = influxDB.query(query);
+ LOG.debug("Data Lake Query Result: " + result.toString());
- public SpQueryResult executeQuery(InfluxDB influxDB,
- Query query,
- boolean ignoreMissingValues) {
- QueryResult result = influxDB.query(query);
+ SpQueryResult dataResult = postQuery(result, ignoreMissingValues);
- return postQuery(result, ignoreMissingValues);
- }
+ influxDB.close();
+ return dataResult;
+ }
- private double getAmountOfResults(QueryResult countQueryResult) {
- if (countQueryResult.getResults().get(0).getSeries() != null
- && countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
- return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
- } else {
- return 0.0;
- }
- }
+ public SpQueryResult executeQuery(Query query, boolean ignoreMissingValues) {
+ InfluxDB influxDB = DataExplorerUtils.getInfluxDBClient();
+ var dataResult = executeQuery(influxDB, query, ignoreMissingValues);
+ influxDB.close();
+ return dataResult;
+ }
- protected DataSeries convertResult(QueryResult.Series series,
- boolean ignoreMissingValues) {
- List<String> columns = series.getColumns();
- List<List<Object>> values = series.getValues();
+ public SpQueryResult executeQuery(InfluxDB influxDB,
+ Query query,
+ boolean ignoreMissingValues) {
+ QueryResult result = influxDB.query(query);
- List<List<Object>> resultingValues = new ArrayList<>();
+ return postQuery(result, ignoreMissingValues);
+ }
- values.forEach(v -> {
- if (ignoreMissingValues) {
- if (!v.contains(null)) {
- resultingValues.add(v);
- }
- } else {
- resultingValues.add(v);
- }
+ private double getAmountOfResults(QueryResult countQueryResult) {
+ if (countQueryResult.getResults().get(0).getSeries() != null
+ && countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
+ return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
+ } else {
+ return 0.0;
+ }
+ }
- });
- return new DataSeries(values.size(), resultingValues, columns, series.getTags());
- }
+ protected DataSeries convertResult(QueryResult.Series series,
+ boolean ignoreMissingValues) {
+ List<String> columns = series.getColumns();
+ List<List<Object>> values = series.getValues();
- protected SpQueryResult postQuery(QueryResult queryResult,
- boolean ignoreMissingValues) throws RuntimeException {
- SpQueryResult result = new SpQueryResult();
-
- if (queryResult.getResults().get(0).getSeries() != null) {
- result.setTotal(queryResult.getResults().get(0).getSeries().size());
- queryResult.getResults().get(0).getSeries().forEach(rs -> {
- DataSeries series = convertResult(rs, ignoreMissingValues);
- result.setHeaders(series.getHeaders());
- result.addDataResult(series);
- });
- }
+ List<List<Object>> resultingValues = new ArrayList<>();
- if (this.appendId) {
- result.setForId(this.forId);
+ values.forEach(v -> {
+ if (ignoreMissingValues) {
+ if (!v.contains(null)) {
+ resultingValues.add(v);
}
+ } else {
+ resultingValues.add(v);
+ }
+
+ });
+
+ return new DataSeries(values.size(), resultingValues, columns, series.getTags());
+ }
+
+ protected SpQueryResult postQuery(QueryResult queryResult,
+ boolean ignoreMissingValues) throws RuntimeException {
+ SpQueryResult result = new SpQueryResult();
+
+ if (queryResult.getResults().get(0).getSeries() != null) {
+ result.setTotal(queryResult.getResults().get(0).getSeries().size());
+ queryResult.getResults().get(0).getSeries().forEach(rs -> {
+ DataSeries series = convertResult(rs, ignoreMissingValues);
+ result.setHeaders(series.getHeaders());
+ result.addDataResult(series);
+ });
+ }
- return result;
+ if (this.appendId) {
+ result.setForId(this.forId);
}
- protected List<QueryElement<?>> getQueryElements() {
- List<QueryElement<?>> queryElements = new ArrayList<>();
+ return result;
+ }
- if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
- queryElements.add(new SelectFromStatement((SelectFromStatementParams) this.params.get(DataLakeManagementUtils.SELECT_FROM)));
- } else {
- queryElements.add(new DeleteFromStatement((DeleteFromStatementParams) this.params.get(DataLakeManagementUtils.DELETE_FROM)));
- }
+ protected List<QueryElement<?>> getQueryElements() {
+ List<QueryElement<?>> queryElements = new ArrayList<>();
- if (this.params.containsKey(DataLakeManagementUtils.WHERE)) {
- queryElements.add(new WhereStatement((WhereStatementParams) this.params.get(DataLakeManagementUtils.WHERE)));
- }
+ if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
+ queryElements.add(
+ new SelectFromStatement((SelectFromStatementParams) this.params.get(DataLakeManagementUtils.SELECT_FROM)));
+ } else {
+ queryElements.add(
+ new DeleteFromStatement((DeleteFromStatementParams) this.params.get(DataLakeManagementUtils.DELETE_FROM)));
+ }
- if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TIME)) {
- queryElements.add(new GroupingByTime((GroupingByTimeParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TIME)));
+ if (this.params.containsKey(DataLakeManagementUtils.WHERE)) {
+ queryElements.add(new WhereStatement((WhereStatementParams) this.params.get(DataLakeManagementUtils.WHERE)));
+ }
- } else if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TAGS)) {
- queryElements.add(new GroupingByTags((GroupingByTagsParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TAGS)));
- }
+ if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TIME)) {
+ queryElements.add(
+ new GroupingByTime((GroupingByTimeParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TIME)));
- if (this.params.containsKey(DataLakeManagementUtils.FILL)) {
- queryElements.add(new FillStatement((FillParams) this.params.get(DataLakeManagementUtils.FILL)));
- }
+ } else if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TAGS)) {
+ queryElements.add(
+ new GroupingByTags((GroupingByTagsParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TAGS)));
+ }
- if (this.params.containsKey(DataLakeManagementUtils.ORDER_DESCENDING)) {
- queryElements.add(new OrderingByTime((OrderingByTimeParams) this.params.get(DataLakeManagementUtils.ORDER_DESCENDING)));
- } else if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
- queryElements.add(new OrderingByTime(OrderingByTimeParams.from(this.params.get(DataLakeManagementUtils.SELECT_FROM).getIndex(), "ASC")));
- }
+ if (this.params.containsKey(DataLakeManagementUtils.FILL)) {
+ queryElements.add(new FillStatement((FillParams) this.params.get(DataLakeManagementUtils.FILL)));
+ }
- if (this.params.containsKey(DataLakeManagementUtils.LIMIT)) {
- queryElements.add(new ItemLimitation((ItemLimitationParams) this.params.get(DataLakeManagementUtils.LIMIT)));
- }
+ if (this.params.containsKey(DataLakeManagementUtils.ORDER_DESCENDING)) {
+ queryElements.add(
+ new OrderingByTime((OrderingByTimeParams) this.params.get(DataLakeManagementUtils.ORDER_DESCENDING)));
+ } else if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
+ queryElements.add(new OrderingByTime(
+ OrderingByTimeParams.from(this.params.get(DataLakeManagementUtils.SELECT_FROM).getIndex(), "ASC")));
+ }
- if (this.params.containsKey(DataLakeManagementUtils.OFFSET)) {
- queryElements.add(new Offset((OffsetParams) this.params.get(DataLakeManagementUtils.OFFSET)));
- }
+ if (this.params.containsKey(DataLakeManagementUtils.LIMIT)) {
+ queryElements.add(new ItemLimitation((ItemLimitationParams) this.params.get(DataLakeManagementUtils.LIMIT)));
+ }
- return queryElements;
+ if (this.params.containsKey(DataLakeManagementUtils.OFFSET)) {
+ queryElements.add(new Offset((OffsetParams) this.params.get(DataLakeManagementUtils.OFFSET)));
}
+
+ return queryElements;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
index 275f20b0e..a4a09d70e 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
@@ -19,6 +19,7 @@
package org.apache.streampipes.dataexplorer.v4.query;
import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
+
import org.influxdb.dto.Query;
import java.util.List;
@@ -26,37 +27,37 @@ import java.util.StringJoiner;
public class QueryBuilder {
- private final StringJoiner queryParts;
- private final String databaseName;
+ private final StringJoiner queryParts;
+ private final String databaseName;
- public static QueryBuilder create(String databaseName) {
- return new QueryBuilder(databaseName);
- }
+ private QueryBuilder(String databaseName) {
+ this.queryParts = new StringJoiner(" ");
+ this.databaseName = databaseName;
+ }
- private QueryBuilder(String databaseName) {
- this.queryParts = new StringJoiner(" ");
- this.databaseName = databaseName;
- }
+ public static QueryBuilder create(String databaseName) {
+ return new QueryBuilder(databaseName);
+ }
- public Query build(List<QueryElement<?>> queryElements, Boolean onlyCountResults) {
- for (QueryElement<?> queryPart : queryElements) {
- this.queryParts.add(queryPart.getStatement());
- }
- if (onlyCountResults) {
- return toCountResultsQuery();
- } else {
- return toQuery();
- }
+ public Query build(List<QueryElement<?>> queryElements, Boolean onlyCountResults) {
+ for (QueryElement<?> queryPart : queryElements) {
+ this.queryParts.add(queryPart.getStatement());
}
-
- public Query toQuery() {
- return new Query(this.queryParts.toString(), this.databaseName);
+ if (onlyCountResults) {
+ return toCountResultsQuery();
+ } else {
+ return toQuery();
}
+ }
- public Query toCountResultsQuery() {
- String q = "SELECT COUNT(*) FROM (" + this.queryParts.toString() + ")";
- return new Query(q, this.databaseName);
- }
+ public Query toQuery() {
+ return new Query(this.queryParts.toString(), this.databaseName);
+ }
+
+ public Query toCountResultsQuery() {
+ String q = "SELECT COUNT(*) FROM (" + this.queryParts.toString() + ")";
+ return new Query(q, this.databaseName);
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
index 34556770f..d6a7b9d18 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
@@ -21,7 +21,6 @@ package org.apache.streampipes.dataexplorer.v4.query;
import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
import org.apache.streampipes.model.datalake.SpQueryResult;
@@ -33,9 +32,8 @@ import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParam
public class QueryResultProvider {
public static final String FOR_ID_KEY = "forId";
-
- protected ProvidedQueryParams queryParams;
protected final boolean ignoreMissingData;
+ protected ProvidedQueryParams queryParams;
public QueryResultProvider(ProvidedQueryParams queryParams,
boolean ignoreMissingData) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
index 5e48ec304..6e1efd3cb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
@@ -105,8 +105,9 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
/**
* Replaces the field 'time' of the data result with the actual timestamp field name of the measurement
+ *
* @param measurement contains the actual timestamp name value
- * @param dataResult the query result of the database with 'time' as timestamp field name
+ * @param dataResult the query result of the database with 'time' as timestamp field name
*/
private void changeTimestampHeader(DataLakeMeasure measurement,
SpQueryResult dataResult) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
index cabd8b6df..fc46398e8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
@@ -22,12 +22,12 @@ import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class DeleteFromStatement extends QueryElement<DeleteFromStatementParams> {
- public DeleteFromStatement(DeleteFromStatementParams deleteFromStatementParams) {
- super(deleteFromStatementParams);
- }
+ public DeleteFromStatement(DeleteFromStatementParams deleteFromStatementParams) {
+ super(deleteFromStatementParams);
+ }
- @Override
- protected String buildStatement(DeleteFromStatementParams deleteFromStatementParams) {
- return QueryTemplatesV4.deleteFrom(deleteFromStatementParams.getIndex());
- }
+ @Override
+ protected String buildStatement(DeleteFromStatementParams deleteFromStatementParams) {
+ return QueryTemplatesV4.deleteFrom(deleteFromStatementParams.getIndex());
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
index a4f156abf..15e93a7e8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
@@ -21,12 +21,12 @@ package org.apache.streampipes.dataexplorer.v4.query.elements;
import org.apache.streampipes.dataexplorer.v4.params.FillParams;
public class FillStatement extends QueryElement<FillParams> {
- public FillStatement(FillParams fillParams) {
- super(fillParams);
- }
+ public FillStatement(FillParams fillParams) {
+ super(fillParams);
+ }
- @Override
- protected String buildStatement(FillParams fillParams) {
- return fillParams.getFill();
- }
+ @Override
+ protected String buildStatement(FillParams fillParams) {
+ return fillParams.getFill();
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
index 61b9b9eee..537119136 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
@@ -23,21 +23,21 @@ import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class GroupingByTags extends QueryElement<GroupingByTagsParams> {
- public GroupingByTags(GroupingByTagsParams groupingByTagsParams) {
- super(groupingByTagsParams);
- }
-
- @Override
- protected String buildStatement(GroupingByTagsParams groupingByTagsParams) {
- String tags = "";
- for (String tag : groupingByTagsParams.getGroupingTags()) {
- if (tags.equals("")) {
- tags = tag;
- } else {
- tags = tags + ", " + tag;
- }
- }
+ public GroupingByTags(GroupingByTagsParams groupingByTagsParams) {
+ super(groupingByTagsParams);
+ }
- return QueryTemplatesV4.groupByTags(tags);
+ @Override
+ protected String buildStatement(GroupingByTagsParams groupingByTagsParams) {
+ String tags = "";
+ for (String tag : groupingByTagsParams.getGroupingTags()) {
+ if (tags.equals("")) {
+ tags = tag;
+ } else {
+ tags = tags + ", " + tag;
+ }
}
+
+ return QueryTemplatesV4.groupByTags(tags);
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
index dc16b650f..3ceadb161 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
@@ -23,12 +23,12 @@ import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class GroupingByTime extends QueryElement<GroupingByTimeParams> {
- public GroupingByTime(GroupingByTimeParams groupingByTimeParams) {
- super(groupingByTimeParams);
- }
+ public GroupingByTime(GroupingByTimeParams groupingByTimeParams) {
+ super(groupingByTimeParams);
+ }
- @Override
- protected String buildStatement(GroupingByTimeParams groupingByTimeParams) {
- return QueryTemplatesV4.groupByTime(groupingByTimeParams.getTimeInterval());
- }
+ @Override
+ protected String buildStatement(GroupingByTimeParams groupingByTimeParams) {
+ return QueryTemplatesV4.groupByTime(groupingByTimeParams.getTimeInterval());
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
index b4f42e084..3158552b7 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
@@ -23,12 +23,12 @@ import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class ItemLimitation extends QueryElement<ItemLimitationParams> {
- public ItemLimitation(ItemLimitationParams itemLimitationParams) {
- super(itemLimitationParams);
- }
+ public ItemLimitation(ItemLimitationParams itemLimitationParams) {
+ super(itemLimitationParams);
+ }
- @Override
- protected String buildStatement(ItemLimitationParams itemLimitationParams) {
- return QueryTemplatesV4.limitItems(itemLimitationParams.getLimit());
- }
+ @Override
+ protected String buildStatement(ItemLimitationParams itemLimitationParams) {
+ return QueryTemplatesV4.limitItems(itemLimitationParams.getLimit());
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
index c8c8c0aca..f931f2f72 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
@@ -23,12 +23,12 @@ import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class Offset extends QueryElement<OffsetParams> {
- public Offset(OffsetParams offsetParams) {
- super(offsetParams);
- }
+ public Offset(OffsetParams offsetParams) {
+ super(offsetParams);
+ }
- @Override
- protected String buildStatement(OffsetParams offsetParams) {
- return QueryTemplatesV4.offset(offsetParams.getOffset());
- }
+ @Override
+ protected String buildStatement(OffsetParams offsetParams) {
+ return QueryTemplatesV4.offset(offsetParams.getOffset());
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
index 877d7e055..58b55645b 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
@@ -23,12 +23,12 @@ import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class OrderingByTime extends QueryElement<OrderingByTimeParams> {
- public OrderingByTime(OrderingByTimeParams orderingByTimeParams) {
- super(orderingByTimeParams);
- }
+ public OrderingByTime(OrderingByTimeParams orderingByTimeParams) {
+ super(orderingByTimeParams);
+ }
- @Override
- protected String buildStatement(OrderingByTimeParams orderingByTimeParams) {
- return QueryTemplatesV4.orderByTime(orderingByTimeParams.getOrdering());
- }
+ @Override
+ protected String buildStatement(OrderingByTimeParams orderingByTimeParams) {
+ return QueryTemplatesV4.orderByTime(orderingByTimeParams.getOrdering());
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
index 8dc7d0642..4ea28af5d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
@@ -20,16 +20,16 @@ package org.apache.streampipes.dataexplorer.v4.query.elements;
import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-public abstract class QueryElement<QP extends QueryParamsV4> {
- protected String queryStatement;
+public abstract class QueryElement<T extends QueryParamsV4> {
+ protected String queryStatement;
- public QueryElement(QP params) {
- this.queryStatement = buildStatement(params);
- }
+ public QueryElement(T params) {
+ this.queryStatement = buildStatement(params);
+ }
- protected abstract String buildStatement(QP params);
+ protected abstract String buildStatement(T params);
- public String getStatement() {
- return queryStatement;
- }
+ public String getStatement() {
+ return queryStatement;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
index 963036794..09e93ffbb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
@@ -35,7 +35,7 @@ public class SelectFromStatement extends QueryElement<SelectFromStatementParams>
} else {
StringJoiner joiner = new StringJoiner(",");
String queryPrefix = "SELECT ";
- String queryAppendix = " FROM " +escapeIndex(params.getIndex());
+ String queryAppendix = " FROM " + escapeIndex(params.getIndex());
params.getSelectedColumns().forEach(column -> {
joiner.add(column.toQueryString());
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
index bae5f2726..3bc23c221 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
@@ -23,18 +23,18 @@ import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
public class TimeBoundary extends QueryElement<TimeBoundaryParams> {
- public TimeBoundary(TimeBoundaryParams timeBoundaryParams) {
- super(timeBoundaryParams);
- }
+ public TimeBoundary(TimeBoundaryParams timeBoundaryParams) {
+ super(timeBoundaryParams);
+ }
- @Override
- protected String buildStatement(TimeBoundaryParams timeBoundaryParams) {
- if (timeBoundaryParams.getStartDate() == null) {
- return QueryTemplatesV4.whereTimeRightBound(timeBoundaryParams.getEndDate());
- } else if (timeBoundaryParams.getEndDate() == null) {
- return QueryTemplatesV4.whereTimeLeftBound(timeBoundaryParams.getStartDate());
- } else {
- return QueryTemplatesV4.whereTimeWithin(timeBoundaryParams.getStartDate(), timeBoundaryParams.getEndDate());
- }
+ @Override
+ protected String buildStatement(TimeBoundaryParams timeBoundaryParams) {
+ if (timeBoundaryParams.getStartDate() == null) {
+ return QueryTemplatesV4.whereTimeRightBound(timeBoundaryParams.getEndDate());
+ } else if (timeBoundaryParams.getEndDate() == null) {
+ return QueryTemplatesV4.whereTimeLeftBound(timeBoundaryParams.getStartDate());
+ } else {
+ return QueryTemplatesV4.whereTimeWithin(timeBoundaryParams.getStartDate(), timeBoundaryParams.getEndDate());
}
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
index 7df602960..b2070e2c1 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
@@ -18,11 +18,12 @@
package org.apache.streampipes.dataexplorer.v4.query.writer;
-import com.google.gson.Gson;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.query.writer.item.ItemGenerator;
import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+import com.google.gson.Gson;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
index 0dfaa163a..6ef855c41 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
@@ -27,8 +27,8 @@ import java.util.List;
public abstract class ConfiguredOutputWriter {
public static ConfiguredOutputWriter getConfiguredWriter(OutputFormat format,
- ProvidedQueryParams params,
- boolean ignoreMissingValues) {
+ ProvidedQueryParams params,
+ boolean ignoreMissingValues) {
var writer = format.getWriter();
writer.configure(params, ignoreMissingValues);
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
index 2e5442f53..056192518 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
@@ -50,7 +50,7 @@ public abstract class ItemGenerator {
}
protected abstract String makeItemString(String key,
- Object value);
+ Object value);
protected abstract String finalizeItem(String item);
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
index dd681d6f4..9b0392cdb 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
@@ -21,69 +21,69 @@ import java.util.StringJoiner;
public class QueryTemplatesV4 {
- public static String selectFrom(String index, String columns) {
- return "SELECT " + columns + " FROM " + index;
+ public static String selectFrom(String index, String columns) {
+ return "SELECT " + columns + " FROM " + index;
+ }
+
+ public static String selectCountFrom(String index, String selectedColumns) {
+ return selectAggregationFrom(index, selectedColumns, "COUNT");
+ }
+
+ public static String selectAggregationFrom(String index, String columns, String aggregationFunction) {
+ String[] cols = columns.split(",");
+ StringJoiner joiner = new StringJoiner(", ");
+ //StringBuilder statement = new StringBuilder(aggregationFunction + "(" + cols[0] + ")");
+
+ for (int i = 0; i < cols.length; i++) {
+ String builder = aggregationFunction
+ + "("
+ + cols[i]
+ + ")" + " AS " + cols[i];
+ joiner.add(builder);
}
- public static String selectCountFrom(String index, String selectedColumns) {
- return selectAggregationFrom(index, selectedColumns, "COUNT");
- }
-
- public static String selectAggregationFrom(String index, String columns, String aggregationFunction) {
- String[] cols = columns.split(",");
- StringJoiner joiner = new StringJoiner(", ");
- //StringBuilder statement = new StringBuilder(aggregationFunction + "(" + cols[0] + ")");
-
- for (int i = 0; i < cols.length; i++) {
- String builder = aggregationFunction +
- "(" +
- cols[i] +
- ")" + " AS " + cols[i];
- joiner.add(builder);
- }
-
- return "SELECT " + joiner + " FROM \"" + index + "\"";
- }
-
- public static String deleteFrom(String index) {
- return "DELETE FROM \"" + index + "\"";
- }
-
- public static String whereTimeWithin(long startDate, long endDate) {
- return "WHERE time > "
- + startDate * 1000000
- + " AND time < "
- + endDate * 1000000;
- }
-
- public static String whereTimeLeftBound(long startDate) {
- return "WHERE time > "
- + startDate * 1000000;
- }
-
- public static String whereTimeRightBound(long endDate) {
- return "WHERE time < "
- + endDate * 1000000;
- }
-
- public static String groupByTags(String tags) {
- return "GROUP BY " + tags;
- }
-
- public static String groupByTime(String timeInterval) {
- return "GROUP BY time(" + timeInterval + ")";
- }
-
- public static String orderByTime(String ordering) {
- return "ORDER BY time " + ordering.toUpperCase();
- }
-
- public static String limitItems(int limit) {
- return "LIMIT " + limit;
- }
-
- public static String offset(int offset) {
- return "OFFSET " + offset;
- }
+ return "SELECT " + joiner + " FROM \"" + index + "\"";
+ }
+
+ public static String deleteFrom(String index) {
+ return "DELETE FROM \"" + index + "\"";
+ }
+
+ public static String whereTimeWithin(long startDate, long endDate) {
+ return "WHERE time > "
+ + startDate * 1000000
+ + " AND time < "
+ + endDate * 1000000;
+ }
+
+ public static String whereTimeLeftBound(long startDate) {
+ return "WHERE time > "
+ + startDate * 1000000;
+ }
+
+ public static String whereTimeRightBound(long endDate) {
+ return "WHERE time < "
+ + endDate * 1000000;
+ }
+
+ public static String groupByTags(String tags) {
+ return "GROUP BY " + tags;
+ }
+
+ public static String groupByTime(String timeInterval) {
+ return "GROUP BY time(" + timeInterval + ")";
+ }
+
+ public static String orderByTime(String ordering) {
+ return "ORDER BY time " + ordering.toUpperCase();
+ }
+
+ public static String limitItems(int limit) {
+ return "LIMIT " + limit;
+ }
+
+ public static String offset(int offset) {
+ return "OFFSET " + offset;
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
index 61eacf75f..850b14c62 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
@@ -19,11 +19,36 @@
package org.apache.streampipes.dataexplorer.v4.utils;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.*;
-
-import java.util.*;
-
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.*;
+import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.params.FillParams;
+import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
+import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
+import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
+import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
+import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
+import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
+import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
+import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_END_DATE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FILTER;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_GROUP_BY;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_OFFSET;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_START_DATE;
+import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
public class DataLakeManagementUtils {
@@ -50,18 +75,19 @@ public class DataLakeManagementUtils {
if (params.has(QP_COUNT_ONLY) && params.getAsBoolean(QP_COUNT_ONLY)) {
queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS), true));
} else {
- queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS), params.getAsString(QP_AGGREGATION_FUNCTION)));
+ queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS),
+ params.getAsString(QP_AGGREGATION_FUNCTION)));
}
String filterConditions = params.getAsString(QP_FILTER);
if (hasTimeParams(params)) {
- queryParts.put(WHERE, WhereStatementParams.from(measurementId,
- params.getAsLong(QP_START_DATE),
- params.getAsLong(QP_END_DATE),
- filterConditions));
+ queryParts.put(WHERE, WhereStatementParams.from(measurementId,
+ params.getAsLong(QP_START_DATE),
+ params.getAsLong(QP_END_DATE),
+ filterConditions));
} else if (filterConditions != null) {
- queryParts.put(WHERE, WhereStatementParams.from(measurementId, filterConditions));
+ queryParts.put(WHERE, WhereStatementParams.from(measurementId, filterConditions));
}
if (params.has(QP_TIME_INTERVAL)) {
@@ -95,7 +121,7 @@ public class DataLakeManagementUtils {
queryParts.put(OFFSET, OffsetParams.from(measurementId, params.getAsInt(QP_OFFSET)));
} else if (params.has(QP_LIMIT) && params.has(QP_PAGE)) {
queryParts.put(OFFSET, OffsetParams.from(measurementId,
- params.getAsInt(QP_PAGE) * params.getAsInt(QP_LIMIT)));
+ params.getAsInt(QP_PAGE) * params.getAsInt(QP_LIMIT)));
}
return queryParts;
@@ -113,8 +139,8 @@ public class DataLakeManagementUtils {
}
private static boolean hasTimeParams(ProvidedQueryParams params) {
- return params.has(QP_START_DATE) ||
- params.has(QP_END_DATE);
+ return params.has(QP_START_DATE)
+ || params.has(QP_END_DATE);
}
public static List<String[]> buildConditions(String queryPart) {
@@ -130,8 +156,8 @@ public class DataLakeManagementUtils {
public static String[] buildSingleCondition(String queryPart) {
return queryPart
- .replaceAll(BRACKET_OPEN, "")
- .replaceAll(BRACKET_CLOSE, "")
- .split(";");
+ .replaceAll(BRACKET_OPEN, "")
+ .replaceAll(BRACKET_CLOSE, "")
+ .split(";");
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
index 71aab02d6..53412edb8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
@@ -31,7 +31,9 @@ import java.time.temporal.TemporalAccessor;
public class TimeParser {
private static final DateTimeFormatter formatter = new DateTimeFormatterBuilder()
- .appendPattern("uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS][.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
+ .appendPattern(
+ "uuuu[-MM[-dd]]['T'HH[:mm[:ss[.SSSSSSSSS][.SSSSSSSS][.SSSSSSS]"
+ + "[.SSSSSS][.SSSSS][.SSSS][.SSS][.SS][.S]]]][XXX]")
.parseDefaulting(ChronoField.NANO_OF_SECOND, 0)
.parseDefaulting(ChronoField.OFFSET_SECONDS, 0)
.toFormatter();
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
index 47b2ac1ca..c4679ffdf 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
@@ -18,9 +18,10 @@
package org.apache.streampipesdataexplorer.v4.query.writer;
-import com.google.common.base.Charsets;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredCsvOutputWriter;
+
+import com.google.common.base.Charsets;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
index 88d7967c5..9072c6b49 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
@@ -18,9 +18,10 @@
package org.apache.streampipesdataexplorer.v4.query.writer;
-import com.google.common.base.Charsets;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredJsonOutputWriter;
+
+import com.google.common.base.Charsets;
import org.junit.Test;
import java.io.ByteArrayOutputStream;
@@ -32,7 +33,7 @@ import static org.junit.Assert.assertEquals;
public class TestConfiguredJsonOutputWriter extends TestConfiguredOutputWriter {
private static final String Expected = "[{\"time\": 1668578077051,\"string\": \"test\",\"number\": 1}"
- +",{\"time\": 1668578127050,\"string\": \"test2\",\"number\": 2}]";
+ + ",{\"time\": 1668578127050,\"string\": \"test2\",\"number\": 2}]";
@Test
public void testJsonOutputWriter() throws IOException {
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
index c1e8d2312..37a25a4d2 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
@@ -19,6 +19,7 @@
package org.apache.streampipesdataexplorer.v4.query.writer.item;
import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
index f89ecd167..c00962835 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
@@ -18,8 +18,9 @@
package org.apache.streampipesdataexplorer.v4.query.writer.item;
-import com.google.gson.Gson;
import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+
+import com.google.gson.Gson;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -27,7 +28,7 @@ import static org.junit.Assert.assertEquals;
public class TestJsonItemWriter extends TestItemWriter {
private static final String Expected = "{\"time\": 1668578077051,\"string\": \"test\",\"number\": 1}";
-
+
@Test
public void testJsonWriter() {
var writer = new JsonItemWriter(new Gson());
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index bce7ebed7..dd75bd445 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -253,7 +253,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
}
private boolean checkProvidedQueryParams(MultivaluedMap<String, String> providedParams) {
- return supportedParams.containsAll(providedParams.keySet());
+ return SUPPORTED_PARAMS.containsAll(providedParams.keySet());
}
private ProvidedQueryParams populate(String measurementId, MultivaluedMap<String, String> rawParams) {