You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/04/02 18:55:52 UTC
[streampipes] branch dev updated: Improve data explorer query management (#1406) (#1407)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new c18643663 Improve data explorer query management (#1406) (#1407)
c18643663 is described below
commit c186436633793d870ba6ff9bdee24f8aebbebeec
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Sun Apr 2 20:55:43 2023 +0200
Improve data explorer query management (#1406) (#1407)
* Improve data explorer query management (#1406)
* Fix checkstyle issue (#1406)
* Fix checkstyle issue (#1406)
* Refactor data explorer module (#1406)
* Fix checkstyle issues (#1406)
* Fix checkstyle issue in text (#1406)
* Fix test (#1406)
* [hotfix] Add test for DataLakeQueryBuilder (#1459)
* Merge missing test (#1406)
* Cleanup code (#1406)
---------
Co-authored-by: Philipp Zehnder <te...@users.noreply.github.com>
---
.../commons/configs/CouchDbConfigurations.java | 35 ---
.../commons/configs/CouchDbEnvKeys.java | 25 ---
.../configs/DataExplorerConfigurations.java | 46 ----
.../commons/configs/DataExplorerEnvKeys.java | 28 ---
.../dataexplorer/DataExplorerQueryManagement.java | 142 ++++++++++++
...ntV3.java => DataExplorerSchemaManagement.java} | 93 ++++++--
.../dataexplorer/DataLakeManagementV4.java | 245 ---------------------
.../api/IDataExplorerQueryManagement.java | 47 ++++
.../IDataExplorerSchemaManagement.java} | 25 ++-
.../IQueryStatement.java} | 14 +-
.../influx/DataExplorerInfluxQueryExecutor.java | 148 +++++++++++++
.../DataLakeInfluxQueryBuilder.java} | 138 +++++++-----
.../DeleteQueryParams.java} | 45 ++--
.../param/ProvidedRestQueryParamConverter.java | 147 +++++++++++++
.../ProvidedRestQueryParams.java} | 12 +-
.../dataexplorer/param/SelectQueryParams.java | 132 +++++++++++
.../SupportedRestQueryParams.java} | 4 +-
.../model/AggregationFunction.java} | 8 +-
.../model/FillClauseParams.java} | 20 +-
.../model/GroupByTagsClauseParams.java} | 24 +-
.../model/GroupByTimeClauseParams.java} | 19 +-
.../model/ItemClauseParams.java} | 19 +-
.../model/OffsetClauseParams.java} | 19 +-
.../model/OrderByClauseParams.java} | 20 +-
.../model/SelectClauseParams.java} | 73 +++---
.../{v4/params => param/model}/SelectColumn.java | 58 +++--
.../param/model/WhereClauseParams.java | 122 ++++++++++
.../{v4 => query}/AutoAggregationHandler.java | 53 +++--
.../query/DataExplorerQueryExecutor.java | 105 +++++++++
.../{v4 => }/query/QueryResultProvider.java | 28 ++-
.../query/StreamedQueryResultProvider.java | 18 +-
.../query/writer/ConfiguredCsvOutputWriter.java | 10 +-
.../query/writer/ConfiguredJsonOutputWriter.java | 10 +-
.../query/writer/ConfiguredOutputWriter.java | 8 +-
.../{v4 => }/query/writer/OutputFormat.java | 2 +-
.../{v4 => }/query/writer/item/CsvItemWriter.java | 2 +-
.../{v4 => }/query/writer/item/ItemGenerator.java | 4 +-
.../{v4 => }/query/writer/item/JsonItemWriter.java | 2 +-
.../DataLakeQueryOrdering.java | 2 +-
.../FilterCondition.java} | 24 +-
.../querybuilder/IDataLakeQueryBuilder.java | 84 +++++++
.../dataexplorer/sdk/DataLakeQueryConstants.java | 30 ---
.../dataexplorer/sdk/IDataLakeQueryBuilder.java | 74 -------
.../dataexplorer/{v4 => }/utils/TimeParser.java | 2 +-
.../v4/params/DeleteFromStatementParams.java | 31 ---
.../dataexplorer/v4/params/FillParams.java | 35 ---
.../v4/params/WhereStatementParams.java | 122 ----------
.../dataexplorer/v4/query/DataExplorerQueryV4.java | 234 --------------------
.../dataexplorer/v4/query/QueryBuilder.java | 64 ------
.../v4/query/elements/DeleteFromStatement.java | 33 ---
.../v4/query/elements/GroupingByTags.java | 43 ----
.../v4/query/elements/GroupingByTime.java | 34 ---
.../v4/query/elements/ItemLimitation.java | 34 ---
.../dataexplorer/v4/query/elements/Offset.java | 34 ---
.../v4/query/elements/OrderingByTime.java | 34 ---
.../v4/query/elements/SelectFromStatement.java | 51 -----
.../v4/query/elements/TimeBoundary.java | 40 ----
.../v4/query/elements/WhereStatement.java | 39 ----
.../dataexplorer/v4/template/QueryTemplatesV4.java | 63 ------
.../v4/utils/DataLakeManagementUtils.java | 163 --------------
.../dataexplorer/param/SelectQueryParamsTest.java | 208 +++++++++++++++++
.../param}/WhereStatementParamsTest.java | 27 ++-
.../writer/TestConfiguredCsvOutputWriter.java | 7 +-
.../writer/TestConfiguredJsonOutputWriter.java | 7 +-
.../query/writer/TestConfiguredOutputWriter.java | 2 +-
.../query/writer/item/TestCsvItemWriter.java | 4 +-
.../query/writer/item/TestItemWriter.java | 2 +-
.../query/writer/item/TestJsonItemWriter.java | 4 +-
.../dataexplorer/sdk/DataLakeQueryBuilderTest.java | 6 +-
.../utils/ProvidedQueryParameterBuilder.java | 92 ++++++++
.../sinks/internal/jvm/SinksInternalJvmInit.java | 4 -
.../streampipes/ps/DataLakeMeasureResourceV3.java | 57 -----
.../streampipes/ps/DataLakeMeasureResourceV4.java | 23 +-
.../apache/streampipes/ps/DataLakeResourceV4.java | 70 +++---
.../apache/streampipes/rest/ResetManagement.java | 14 +-
.../service/core/StreamPipesResourceConfig.java | 2 -
76 files changed, 1758 insertions(+), 1991 deletions(-)
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
deleted file mode 100644
index 45678a9c1..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class CouchDbConfigurations {
-
- public static List<ConfigItem> getDefaults() {
- return Arrays.asList(
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
- );
- }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
deleted file mode 100644
index 23867c634..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-public class CouchDbEnvKeys {
- public static final String COUCHDB_HOST = "SP_COUCHDB_HOST";
- public static final String COUCHDB_PORT = "SP_COUCHDB_PORT";
- public static final String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
-}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
deleted file mode 100644
index feb7135ea..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class DataExplorerConfigurations {
- public static final String DATA_LAKE_DATABASE_NAME = "sp";
-
- public static List<ConfigItem> getDefaults() {
-
- return Arrays.asList(
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb",
- "Hostname for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http",
- "Protocol for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default",
- "Username for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default",
- "Password for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME,
- "Database name for the StreamPipes data lake database")
- );
- }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
deleted file mode 100644
index 6eb840708..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-public class DataExplorerEnvKeys {
- public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
- public static final String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
- public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
- public static final String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
- public static final String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
- public static final String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
new file mode 100644
index 000000000..3e927e977
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer;
+
+import org.apache.streampipes.commons.environment.Environment;
+import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
+import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
+import org.apache.streampipes.dataexplorer.query.QueryResultProvider;
+import org.apache.streampipes.dataexplorer.query.StreamedQueryResultProvider;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {
+
+ private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
+
+ public DataExplorerQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement) {
+ this.dataExplorerSchemaManagement = dataExplorerSchemaManagement;
+ }
+
+ @Override
+ public SpQueryResult getData(ProvidedRestQueryParams queryParams,
+ boolean ignoreMissingData) throws IllegalArgumentException {
+ return new QueryResultProvider(queryParams, ignoreMissingData).getData();
+ }
+
+ @Override
+ public void getDataAsStream(ProvidedRestQueryParams params,
+ OutputFormat format,
+ boolean ignoreMissingValues,
+ OutputStream outputStream) throws IOException {
+
+ new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
+ }
+
+ @Override
+ public boolean deleteAllData() {
+ List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+
+ for (DataLakeMeasure measure : allMeasurements) {
+ QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
+ if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean deleteData(String measurementID) {
+ List<DataLakeMeasure> allMeasurements = getAllMeasurements();
+ for (DataLakeMeasure measure : allMeasurements) {
+ if (measure.getMeasureName().equals(measurementID)) {
+ QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
+
+ return !queryResult.hasError();
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
+ DeleteQueryParams params =
+ ProvidedRestQueryParamConverter.getDeleteQueryParams(measurementID, startDate, endDate);
+ return new DataExplorerInfluxQueryExecutor().executeQuery(params);
+ }
+
+ @Override
+ public Map<String, Object> getTagValues(String measurementId,
+ String fields) {
+ InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
+ String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
+ Map<String, Object> tags = new HashMap<>();
+ if (fields != null && !("".equals(fields))) {
+ List<String> fieldList = Arrays.asList(fields.split(","));
+ fieldList.forEach(f -> {
+ String q =
+ "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
+ + "\" WITH KEY = \"" + f + "\"";
+ Query query = new Query(q);
+ QueryResult queryResult = influxDB.query(query);
+ queryResult.getResults().forEach(res -> {
+ res.getSeries().forEach(series -> {
+ if (series.getValues().size() > 0) {
+ String field = series.getValues().get(0).get(0).toString();
+ List<String> values =
+ series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
+ tags.put(field, values);
+ }
+ });
+ });
+ });
+ }
+
+ return tags;
+ }
+
+ private List<DataLakeMeasure> getAllMeasurements() {
+ return this.dataExplorerSchemaManagement.getAllMeasurements();
+ }
+
+ private Environment getEnvironment() {
+ return Environments.getEnvironment();
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
similarity index 54%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
index 6fd104ecd..f28cf6cbd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
@@ -18,41 +18,104 @@
package org.apache.streampipes.dataexplorer;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyList;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.storage.api.IDataLakeStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
import org.apache.streampipes.storage.management.StorageDispatcher;
+import com.google.gson.JsonObject;
+import org.lightcouch.CouchDbClient;
+
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+public class DataExplorerSchemaManagement implements IDataExplorerSchemaManagement {
-@Deprecated
-public class DataLakeNoUserManagementV3 {
+ @Override
+ public List<DataLakeMeasure> getAllMeasurements() {
+ return DataExplorerUtils.getInfos();
+ }
+ @Override
+ public DataLakeMeasure getById(String elementId) {
+ return getDataLakeStorage().findOne(elementId);
+ }
- @Deprecated
- public boolean addDataLake(String measure, EventSchema eventSchema) {
+ @Override
+ public DataLakeMeasure createMeasurement(DataLakeMeasure measure) {
List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
Optional<DataLakeMeasure> optional =
- dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
+ dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
+ .findFirst();
if (optional.isPresent()) {
- if (!compareEventProperties(optional.get().getEventSchema().getEventProperties(),
- eventSchema.getEventProperties())) {
- return false;
+ DataLakeMeasure oldEntry = optional.get();
+ if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
+ measure.getEventSchema().getEventProperties())) {
+ return oldEntry;
+ }
+ } else {
+ measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+ getDataLakeStorage().storeDataLakeMeasure(measure);
+ return measure;
+ }
+
+ return measure;
+ }
+
+ @Override
+ public void deleteMeasurement(String elementId) {
+ if (getDataLakeStorage().findOne(elementId) != null) {
+ getDataLakeStorage().deleteDataLakeMeasure(elementId);
+ } else {
+ throw new IllegalArgumentException("Could not find measure with this ID");
+ }
+ }
+
+ @Override
+ public boolean deleteMeasurementByName(String measureName) {
+ boolean isSuccess = false;
+ CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+ List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+ for (JsonObject document : docs) {
+ if (document.get("measureName").toString().replace("\"", "").equals(measureName)) {
+ couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
+ document.get("_rev").toString().replace("\"", ""));
+ isSuccess = true;
+ break;
}
+ }
+
+ try {
+ couchDbClient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return isSuccess;
+ }
+
+ @Override
+ public void updateMeasurement(DataLakeMeasure measure) {
+ var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+ if (existingMeasure != null) {
+ measure.setRev(existingMeasure.getRev());
+ getDataLakeStorage().updateDataLakeMeasure(measure);
} else {
- DataLakeMeasure dataLakeMeasure = new DataLakeMeasure(measure, eventSchema);
- dataLakeMeasure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
- getDataLakeStorage().storeDataLakeMeasure(dataLakeMeasure);
+ getDataLakeStorage().storeDataLakeMeasure(measure);
}
- return true;
+ }
+
+ private IDataLakeStorage getDataLakeStorage() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
}
private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
@@ -89,8 +152,4 @@ public class DataLakeNoUserManagementV3 {
});
}
-
- private IDataLakeStorage getDataLakeStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
- }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
deleted file mode 100644
index 7f397af7a..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer;
-
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
-import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
-import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
-import org.apache.streampipes.dataexplorer.v4.query.QueryResultProvider;
-import org.apache.streampipes.dataexplorer.v4.query.StreamedQueryResultProvider;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
-import org.apache.streampipes.model.datalake.DataLakeMeasure;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.storage.api.IDataLakeStorage;
-import org.apache.streampipes.storage.couchdb.utils.Utils;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import com.google.gson.JsonObject;
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-import org.lightcouch.CouchDbClient;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-public class DataLakeManagementV4 {
-
- public List<DataLakeMeasure> getAllMeasurements() {
- return DataExplorerUtils.getInfos();
- }
-
- public DataLakeMeasure getById(String measureId) {
- return getDataLakeStorage().findOne(measureId);
- }
-
- public SpQueryResult getData(ProvidedQueryParams queryParams,
- boolean ignoreMissingData) throws IllegalArgumentException {
- return new QueryResultProvider(queryParams, ignoreMissingData).getData();
- }
-
- public void getDataAsStream(ProvidedQueryParams params,
- OutputFormat format,
- boolean ignoreMissingValues,
- OutputStream outputStream) throws IOException {
-
- new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
- }
-
- public boolean removeAllMeasurements() {
- List<DataLakeMeasure> allMeasurements = getAllMeasurements();
-
- for (DataLakeMeasure measure : allMeasurements) {
- QueryResult queryResult = new DeleteDataQuery(measure).executeQuery();
- if (queryResult.hasError() || queryResult.getResults().get(0).getError() != null) {
- return false;
- }
- }
- return true;
- }
-
- public boolean removeMeasurement(String measurementID) {
- List<DataLakeMeasure> allMeasurements = getAllMeasurements();
- for (DataLakeMeasure measure : allMeasurements) {
- if (measure.getMeasureName().equals(measurementID)) {
- QueryResult queryResult = new DeleteDataQuery(new DataLakeMeasure(measurementID, null)).executeQuery();
-
- return !queryResult.hasError();
- }
- }
- return false;
- }
-
- public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
- Map<String, QueryParamsV4> queryParts =
- DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
- return new DataExplorerQueryV4(queryParts).executeQuery(true);
- }
-
- public boolean removeEventProperty(String measurementID) {
- boolean isSuccess = false;
- CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
- List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
-
- for (JsonObject document : docs) {
- if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
- couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
- document.get("_rev").toString().replace("\"", ""));
- isSuccess = true;
- break;
- }
- }
-
- try {
- couchDbClient.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return isSuccess;
- }
-
- public Map<String, Object> getTagValues(String measurementId,
- String fields) {
- InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
- String databaseName = getEnvironment().getTsStorageBucket().getValueOrDefault();
- Map<String, Object> tags = new HashMap<>();
- if (fields != null && !("".equals(fields))) {
- List<String> fieldList = Arrays.asList(fields.split(","));
- fieldList.forEach(f -> {
- String q =
- "SHOW TAG VALUES ON \"" + databaseName + "\" FROM \"" + measurementId
- + "\" WITH KEY = \"" + f + "\"";
- Query query = new Query(q);
- QueryResult queryResult = influxDB.query(query);
- queryResult.getResults().forEach(res -> {
- res.getSeries().forEach(series -> {
- if (series.getValues().size() > 0) {
- String field = series.getValues().get(0).get(0).toString();
- List<String> values =
- series.getValues().stream().map(v -> v.get(1).toString()).collect(Collectors.toList());
- tags.put(field, values);
- }
- });
- });
- });
- }
-
- return tags;
- }
-
- public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
- var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
- if (existingMeasure != null) {
- measure.setRev(existingMeasure.getRev());
- getDataLakeStorage().updateDataLakeMeasure(measure);
- } else {
- getDataLakeStorage().storeDataLakeMeasure(measure);
- }
- }
-
- public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
- if (getDataLakeStorage().findOne(elementId) != null) {
- getDataLakeStorage().deleteDataLakeMeasure(elementId);
- } else {
- throw new IllegalArgumentException("Could not find measure with this ID");
- }
- }
-
- public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
- List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
- Optional<DataLakeMeasure> optional =
- dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
- .findFirst();
-
- if (optional.isPresent()) {
- DataLakeMeasure oldEntry = optional.get();
- if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
- measure.getEventSchema().getEventProperties())) {
- return oldEntry;
- }
- } else {
- measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
- getDataLakeStorage().storeDataLakeMeasure(measure);
- return measure;
- }
-
- return measure;
- }
-
- private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
- if (prop1.size() != prop2.size()) {
- return false;
- }
-
- return prop1.stream().allMatch(prop -> {
-
- for (EventProperty property : prop2) {
- if (prop.getRuntimeName().equals(property.getRuntimeName())) {
-
- //primitive
- if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
- if (((EventPropertyPrimitive) prop)
- .getRuntimeType()
- .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
- return true;
- }
-
- //list
- } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
- return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
- Collections.singletonList(((EventPropertyList) property).getEventProperty()));
-
- //nested
- } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
- return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
- ((EventPropertyNested) property).getEventProperties());
- }
- }
- }
- return false;
-
- });
- }
-
-
- private IDataLakeStorage getDataLakeStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
- }
-
- private Environment getEnvironment() {
- return Environments.getEnvironment();
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
new file mode 100644
index 000000000..59620a21b
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.api;
+
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface IDataExplorerQueryManagement {
+
+ SpQueryResult getData(ProvidedRestQueryParams queryParams,
+ boolean ignoreMissingData) throws IllegalArgumentException;
+
+ void getDataAsStream(ProvidedRestQueryParams params,
+ OutputFormat format,
+ boolean ignoreMissingValues,
+ OutputStream outputStream) throws IOException;
+
+ boolean deleteData(String measurementID);
+
+ SpQueryResult deleteData(String measurementID, Long startDate, Long endDate);
+
+ boolean deleteAllData();
+
+ Map<String, Object> getTagValues(String measurementId,
+ String fields);
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
similarity index 62%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
index 4ea28af5d..d8dc29648 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/QueryElement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
@@ -16,20 +16,23 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query.elements;
+package org.apache.streampipes.dataexplorer.api;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
-public abstract class QueryElement<T extends QueryParamsV4> {
- protected String queryStatement;
+import java.util.List;
- public QueryElement(T params) {
- this.queryStatement = buildStatement(params);
- }
+public interface IDataExplorerSchemaManagement {
- protected abstract String buildStatement(T params);
+ List<DataLakeMeasure> getAllMeasurements();
- public String getStatement() {
- return queryStatement;
- }
+ DataLakeMeasure getById(String elementId);
+
+ DataLakeMeasure createMeasurement(DataLakeMeasure measure);
+
+ void deleteMeasurement(String elementId);
+
+ boolean deleteMeasurementByName(String measureName);
+
+ void updateMeasurement(DataLakeMeasure measure);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IQueryStatement.java
similarity index 76%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IQueryStatement.java
index 49046beec..1e1a0f39a 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/QueryParamsV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IQueryStatement.java
@@ -15,18 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
-public abstract class QueryParamsV4 {
+package org.apache.streampipes.dataexplorer.api;
- private final String index;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
- protected QueryParamsV4(String index) {
- this.index = index;
- }
-
- public String getIndex() {
- return index;
- }
+public interface IQueryStatement {
+ void buildStatement(IDataLakeQueryBuilder<?> builder);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
new file mode 100644
index 000000000..d0bbdc09f
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataExplorerInfluxQueryExecutor.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.influx;
+
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
+import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
+import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
+import org.apache.streampipes.dataexplorer.query.DataExplorerQueryExecutor;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+import org.apache.streampipes.model.datalake.DataSeries;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import org.influxdb.InfluxDB;
+import org.influxdb.dto.Query;
+import org.influxdb.dto.QueryResult;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DataExplorerInfluxQueryExecutor extends DataExplorerQueryExecutor<Query, QueryResult> {
+
+ public DataExplorerInfluxQueryExecutor() {
+ super();
+ }
+
+ public DataExplorerInfluxQueryExecutor(String forId) {
+ super(forId);
+ }
+
+ public DataExplorerInfluxQueryExecutor(int maximumAmountOfEvents) {
+ super(maximumAmountOfEvents);
+ }
+
+
+ @Override
+ protected double getAmountOfResults(QueryResult countQueryResult) {
+ if (countQueryResult.getResults().get(0).getSeries() != null
+ && countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
+ return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
+ } else {
+ return 0.0;
+ }
+ }
+
+
+ protected DataSeries convertResult(QueryResult.Series series,
+ boolean ignoreMissingValues) {
+ List<String> columns = series.getColumns();
+ List<List<Object>> values = series.getValues();
+
+ List<List<Object>> resultingValues = new ArrayList<>();
+
+ values.forEach(v -> {
+ if (ignoreMissingValues) {
+ if (!v.contains(null)) {
+ resultingValues.add(v);
+ }
+ } else {
+ resultingValues.add(v);
+ }
+
+ });
+
+ return new DataSeries(values.size(), resultingValues, columns, series.getTags());
+ }
+
+ protected SpQueryResult postQuery(QueryResult queryResult,
+ boolean ignoreMissingValues) throws RuntimeException {
+ SpQueryResult result = new SpQueryResult();
+
+ if (hasResult(queryResult)) {
+ result.setTotal(queryResult.getResults().get(0).getSeries().size());
+ queryResult.getResults().get(0).getSeries().forEach(rs -> {
+ DataSeries series = convertResult(rs, ignoreMissingValues);
+ result.setHeaders(series.getHeaders());
+ result.addDataResult(series);
+ });
+ }
+
+ if (this.appendId) {
+ result.setForId(this.forId);
+ }
+
+ return result;
+ }
+
+ private IDataLakeQueryBuilder<Query> getQueryBuilder(String measurementId) {
+ return DataLakeInfluxQueryBuilder.create(measurementId);
+ }
+
+ @Override
+ protected QueryResult executeQuery(Query query) {
+ try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
+ return influxDB.query(query);
+ }
+ }
+
+ @Override
+ protected String asQueryString(Query query) {
+ return "(database:" + query.getDatabase() + "): " + query.getCommand();
+ }
+
+ @Override
+ protected Query makeDeleteQuery(DeleteQueryParams params) {
+ String query = "DELETE FROM \"" + params.getMeasurementId() + "\"";
+ if (params.isTimeRestricted()) {
+ query += "WHERE time > "
+ + params.getStartTime() * 1000000
+ + " AND time < "
+ + params.getEndTime() * 1000000;
+ }
+ return new Query(query);
+ }
+
+ @Override
+ protected Query makeCountQuery(SelectQueryParams params) {
+ var builder = getQueryBuilder(params.getIndex());
+ return params.toCountQuery(builder);
+ }
+
+ @Override
+ protected Query makeSelectQuery(SelectQueryParams params) {
+ var builder = getQueryBuilder(params.getIndex());
+ return params.toQuery(builder);
+ }
+
+ private boolean hasResult(QueryResult queryResult) {
+ return queryResult.getResults() != null
+ && queryResult.getResults().size() > 0
+ && queryResult.getResults().get(0).getSeries() != null;
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeInfluxQueryBuilder.java
similarity index 55%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeInfluxQueryBuilder.java
index 42f5126e7..d2f518246 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilder.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/influx/DataLakeInfluxQueryBuilder.java
@@ -16,11 +16,14 @@
*
*/
-package org.apache.streampipes.dataexplorer.sdk;
+package org.apache.streampipes.dataexplorer.influx;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction;
+import org.apache.streampipes.dataexplorer.param.model.AggregationFunction;
+import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering;
+import org.apache.streampipes.dataexplorer.querybuilder.FilterCondition;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
import org.influxdb.dto.Query;
import org.influxdb.querybuilder.Ordering;
@@ -35,12 +38,13 @@ import org.influxdb.querybuilder.clauses.SimpleClause;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.asc;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.desc;
import static org.influxdb.querybuilder.BuiltQuery.QueryBuilder.select;
-public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
+public class DataLakeInfluxQueryBuilder implements IDataLakeQueryBuilder<Query> {
private final String measurementId;
private final SelectionQueryImpl selectionQuery;
@@ -50,9 +54,11 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
private int limit = Integer.MIN_VALUE;
private int offset = Integer.MIN_VALUE;
- private Environment env;
+ private Object fill;
- private DataLakeQueryBuilder(String measurementId) {
+ private final Environment env;
+
+ private DataLakeInfluxQueryBuilder(String measurementId) {
this.measurementId = measurementId;
this.selectionQuery = select();
this.whereClauses = new ArrayList<>();
@@ -60,70 +66,70 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
this.env = Environments.getEnvironment();
}
- public static DataLakeQueryBuilder create(String measurementId) {
- return new DataLakeQueryBuilder(measurementId);
+ public static DataLakeInfluxQueryBuilder create(String measurementId) {
+ return new DataLakeInfluxQueryBuilder(measurementId);
}
+
@Override
- public DataLakeQueryBuilder withSimpleColumn(String columnName) {
- this.selectionQuery.column(columnName);
+ public DataLakeInfluxQueryBuilder withAllColumns() {
+ this.selectionQuery.all();
+ return this;
+ }
+ @Override
+ public DataLakeInfluxQueryBuilder withSimpleColumn(String columnName) {
+ this.selectionQuery.column(columnName);
return this;
}
@Override
- public DataLakeQueryBuilder withSimpleColumns(List<String> columnNames) {
+ public DataLakeInfluxQueryBuilder withSimpleColumns(List<String> columnNames) {
columnNames.forEach(this.selectionQuery::column);
return this;
}
@Override
- public DataLakeQueryBuilder withAggregatedColumn(String columnName,
- ColumnFunction columnFunction,
- String targetName) {
- if (columnFunction == ColumnFunction.COUNT) {
- this.selectionQuery.count(columnName).as(targetName);
- } else if (columnFunction == ColumnFunction.MEAN) {
- this.selectionQuery.mean(columnName).as(targetName);
- } else if (columnFunction == ColumnFunction.MIN) {
- this.selectionQuery.min(columnName).as(targetName);
- } else if (columnFunction == ColumnFunction.MAX) {
- this.selectionQuery.max(columnName).as(targetName);
- } else if (columnFunction == ColumnFunction.FIRST) {
- this.selectionQuery.function("FIRST", columnName).as(targetName);
- } else if (columnFunction == ColumnFunction.LAST) {
- this.selectionQuery.function("LAST", columnName).as(targetName);
- }
+ public DataLakeInfluxQueryBuilder withAggregatedColumn(String columnName,
+ AggregationFunction aggregationFunction,
+ String aliasName) {
- // TODO implement all column functions
+ this.selectionQuery.function(aggregationFunction.toDbName(), columnName).as(aliasName);
return this;
}
@Override
- public DataLakeQueryBuilder withStartTime(long startTime) {
+ public IDataLakeQueryBuilder<Query> withAggregatedColumn(String columnName, AggregationFunction aggregationFunction) {
+ this.selectionQuery.function(aggregationFunction.toDbName(), columnName);
+
+ return this;
+ }
+
+ @Override
+ public DataLakeInfluxQueryBuilder withStartTime(long startTime) {
this.whereClauses.add(new SimpleClause("time", ">=", startTime * 1000000));
return this;
}
@Override
- public DataLakeQueryBuilder withEndTime(long endTime) {
+ public DataLakeInfluxQueryBuilder withEndTime(long endTime) {
return withEndTime(endTime, true);
}
@Override
- public DataLakeQueryBuilder withEndTime(long endTime,
- boolean includeEndTime) {
+ public DataLakeInfluxQueryBuilder withEndTime(long endTime,
+ boolean includeEndTime) {
String operator = includeEndTime ? "<=" : "<";
this.whereClauses.add(new SimpleClause("time", operator, endTime * 1000000));
return this;
}
@Override
- public DataLakeQueryBuilder withTimeBoundary(long startTime,
- long endTime) {
+ public DataLakeInfluxQueryBuilder withTimeBoundary(long startTime,
+ long endTime) {
this.withStartTime(startTime);
this.withEndTime(endTime);
@@ -131,17 +137,17 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withFilter(String field,
- String operator,
- Object value) {
+ public DataLakeInfluxQueryBuilder withFilter(String field,
+ String operator,
+ Object value) {
this.whereClauses.add(new SimpleClause(field, operator, value));
return this;
}
@Override
- public DataLakeQueryBuilder withExclusiveFilter(String field,
- String operator,
- List<?> values) {
+ public DataLakeInfluxQueryBuilder withExclusiveFilter(String field,
+ String operator,
+ List<?> values) {
List<ConjunctionClause> or = new ArrayList<>();
values.forEach(value -> or.add(new OrConjunction(new SimpleClause(field, operator, value))));
@@ -150,9 +156,9 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withInclusiveFilter(String field,
- String operator,
- List<?> values) {
+ public DataLakeInfluxQueryBuilder withInclusiveFilter(String field,
+ String operator,
+ List<?> values) {
List<ConjunctionClause> and = new ArrayList<>();
values.forEach(value -> and.add(new AndConjunction(new SimpleClause(field, operator, value))));
@@ -161,14 +167,25 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withFilter(NestedClause clause) {
+ public IDataLakeQueryBuilder<Query> withInclusiveFilter(List<FilterCondition> filterConditions) {
+ List<ConjunctionClause> and = new ArrayList<>();
+ filterConditions.forEach(c -> and
+ .add(new AndConjunction(new SimpleClause(c.getField(), c.getOperator(), c.getCondition()))));
+
+ addNestedWhereClause(and);
+
+ return this;
+ }
+
+ @Override
+ public DataLakeInfluxQueryBuilder withFilter(NestedClause clause) {
this.whereClauses.add(clause);
return this;
}
@Override
- public DataLakeQueryBuilder withGroupByTime(String timeInterval) {
+ public DataLakeInfluxQueryBuilder withGroupByTime(String timeInterval) {
this.groupByClauses.add(new RawTextClause("time(" + timeInterval + ")"));
@@ -176,8 +193,8 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withGroupByTime(String timeInterval,
- String offsetInterval) {
+ public DataLakeInfluxQueryBuilder withGroupByTime(String timeInterval,
+ String offsetInterval) {
this.groupByClauses.add(new RawTextClause("time("
+ timeInterval
@@ -189,7 +206,7 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withGroupBy(String column) {
+ public DataLakeInfluxQueryBuilder withGroupBy(String column) {
this.groupByClauses.add(new RawTextClause(column));
@@ -197,7 +214,7 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withOrderBy(DataLakeQueryOrdering ordering) {
+ public DataLakeInfluxQueryBuilder withOrderBy(DataLakeQueryOrdering ordering) {
if (DataLakeQueryOrdering.ASC.equals(ordering)) {
this.ordering = asc();
} else {
@@ -208,23 +225,30 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
}
@Override
- public DataLakeQueryBuilder withLimit(int limit) {
+ public DataLakeInfluxQueryBuilder withLimit(int limit) {
this.limit = limit;
return this;
}
@Override
- public DataLakeQueryBuilder withOffset(int offset) {
+ public DataLakeInfluxQueryBuilder withOffset(int offset) {
this.offset = offset;
return this;
}
+ @Override
+ public IDataLakeQueryBuilder<Query> withFill(Object fill) {
+ this.fill = fill;
+
+ return this;
+ }
+
@Override
public Query build() {
var selectQuery =
- this.selectionQuery.from(env.getTsStorageBucket().getValueOrDefault(), "\"" + measurementId + "\"");
+ this.selectionQuery.from(env.getTsStorageBucket().getValueOrDefault(), escapeIndex(measurementId));
this.whereClauses.forEach(selectQuery::where);
if (this.groupByClauses.size() > 0) {
@@ -243,6 +267,14 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
selectQuery.limit(this.limit, this.offset);
}
+ if (Objects.nonNull(fill)) {
+ if (fill instanceof String) {
+ selectQuery.fill((String) fill);
+ } else {
+ selectQuery.fill((Number) fill);
+ }
+ }
+
return selectQuery;
}
@@ -250,4 +282,8 @@ public class DataLakeQueryBuilder implements IDataLakeQueryBuilder<Query> {
NestedClause nestedClause = new NestedClause(clauses);
this.whereClauses.add(nestedClause);
}
+
+ private String escapeIndex(String index) {
+ return "\"" + index + "\"";
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/DeleteQueryParams.java
similarity index 50%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/DeleteQueryParams.java
index 64c0a4867..8bdda43de 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/TimeBoundaryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/DeleteQueryParams.java
@@ -16,28 +16,45 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param;
-public class TimeBoundaryParams extends QueryParamsV4 {
+public class DeleteQueryParams {
- private final Long startDate;
- private final Long endDate;
+ private final String measurementId;
- protected TimeBoundaryParams(String measurementID, Long startDate, Long endDate) {
- super(measurementID);
- this.startDate = startDate;
- this.endDate = endDate;
+ private long startTime;
+
+ private long endTime;
+
+ private boolean timeRestricted;
+
+ public DeleteQueryParams(String measurementId) {
+ this.measurementId = measurementId;
+ this.timeRestricted = false;
+ }
+
+ public DeleteQueryParams(String measurementId,
+ Long startTime,
+ Long endTime) {
+ this(measurementId);
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.timeRestricted = true;
+ }
+
+ public String getMeasurementId() {
+ return measurementId;
}
- public static TimeBoundaryParams from(String measurementID, Long startDate, Long endDate) {
- return new TimeBoundaryParams(measurementID, startDate, endDate);
+ public long getStartTime() {
+ return startTime;
}
- public Long getStartDate() {
- return startDate;
+ public long getEndTime() {
+ return endTime;
}
- public Long getEndDate() {
- return endDate;
+ public boolean isTimeRestricted() {
+ return timeRestricted;
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParamConverter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParamConverter.java
new file mode 100644
index 000000000..963ec9d32
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParamConverter.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.param.model.FillClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTagsClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTimeClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.ItemClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OffsetClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OrderByClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.SelectClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.WhereClauseParams;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_END_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_FILTER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_GROUP_BY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_OFFSET;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_START_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_TIME_INTERVAL;
+
+
+public class ProvidedRestQueryParamConverter {
+
+ public static final String BRACKET_OPEN = "\\[";
+ public static final String BRACKET_CLOSE = "\\]";
+
+ public static final String ORDER_DESCENDING = "DESC";
+
+ public static SelectQueryParams getSelectQueryParams(ProvidedRestQueryParams params) {
+ SelectQueryParams queryParameters = new SelectQueryParams(params.getMeasurementId());
+
+ String measurementId = params.getMeasurementId();
+
+ if (params.has(QP_COUNT_ONLY) && params.getAsBoolean(QP_COUNT_ONLY)) {
+ queryParameters.withSelectParams(SelectClauseParams.from(params.getAsString(QP_COLUMNS), true));
+ } else {
+ queryParameters.withSelectParams(SelectClauseParams.from(params.getAsString(QP_COLUMNS),
+ params.getAsString(QP_AGGREGATION_FUNCTION)));
+ }
+
+ String filterConditions = params.getAsString(QP_FILTER);
+
+ if (hasTimeParams(params)) {
+ queryParameters.withWhereParams(WhereClauseParams.from(
+ params.getAsLong(QP_START_DATE),
+ params.getAsLong(QP_END_DATE),
+ filterConditions));
+ } else if (filterConditions != null) {
+ queryParameters.withWhereParams(WhereClauseParams.from(filterConditions));
+ }
+
+ if (params.has(QP_TIME_INTERVAL)) {
+ String timeInterval = params.getAsString(QP_TIME_INTERVAL);
+ if (!params.has(QP_GROUP_BY)) {
+ queryParameters.withGroupByTimeParams(GroupByTimeClauseParams.from(timeInterval));
+ } else {
+ params.update(QP_GROUP_BY, params.getAsString(QP_GROUP_BY) + ",time(" + timeInterval + ")");
+ }
+
+ queryParameters.withFillParams(FillClauseParams.from());
+ }
+
+ if (params.has(QP_GROUP_BY)) {
+ queryParameters.withGroupByTagsParams(GroupByTagsClauseParams.from(params.getAsString(QP_GROUP_BY)));
+ }
+
+
+ if (params.has(QP_ORDER)) {
+ String order = params.getAsString(QP_ORDER);
+ if (order.equals(ORDER_DESCENDING)) {
+ queryParameters.withOrderByParams(OrderByClauseParams.from(order));
+ }
+ }
+
+ if (params.has(QP_LIMIT)) {
+ queryParameters.withLimitParams(ItemClauseParams.from(params.getAsInt(QP_LIMIT)));
+ }
+
+ if (params.has(QP_OFFSET)) {
+ queryParameters.withOffsetParams(OffsetClauseParams.from(params.getAsInt(QP_OFFSET)));
+ } else if (params.has(QP_LIMIT) && params.has(QP_PAGE)) {
+ queryParameters.withOffsetParams(OffsetClauseParams.from(
+ params.getAsInt(QP_PAGE) * params.getAsInt(QP_LIMIT)));
+ }
+
+ return queryParameters;
+ }
+
+ public static DeleteQueryParams getDeleteQueryParams(String measurementId,
+ Long startTime,
+ Long endTime) {
+ if (startTime != null || endTime != null) {
+ return new DeleteQueryParams(measurementId, startTime, endTime);
+ } else {
+ return new DeleteQueryParams(measurementId);
+ }
+ }
+
+ private static boolean hasTimeParams(ProvidedRestQueryParams params) {
+ return params.has(QP_START_DATE)
+ || params.has(QP_END_DATE);
+ }
+
+ public static List<String[]> buildConditions(String queryPart) {
+ String[] conditions = queryPart.split(",");
+ List<String[]> result = new ArrayList<>();
+
+ Arrays.stream(conditions).forEach(condition -> {
+ String[] singleCondition = buildSingleCondition(condition);
+ result.add(singleCondition);
+ });
+ return result;
+ }
+
+ public static String[] buildSingleCondition(String queryPart) {
+ return queryPart
+ .replaceAll(BRACKET_OPEN, "")
+ .replaceAll(BRACKET_CLOSE, "")
+ .split(";");
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
similarity index 86%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
index e3c9535d6..c783e474d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/ProvidedQueryParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/ProvidedRestQueryParams.java
@@ -15,26 +15,26 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4;
+package org.apache.streampipes.dataexplorer.param;
import java.util.HashMap;
import java.util.Map;
-public class ProvidedQueryParams {
+public class ProvidedRestQueryParams {
private final String measurementId;
private final Map<String, String> providedParams;
- public ProvidedQueryParams(String measurementId,
- Map<String, String> providedParams) {
+ public ProvidedRestQueryParams(String measurementId,
+ Map<String, String> providedParams) {
this.measurementId = measurementId;
this.providedParams = providedParams;
}
- public ProvidedQueryParams(ProvidedQueryParams params) {
+ public ProvidedRestQueryParams(ProvidedRestQueryParams params) {
this.measurementId = params.getMeasurementId();
this.providedParams = new HashMap<>();
- params.getProvidedParams().forEach(providedParams::put);
+ providedParams.putAll(params.getProvidedParams());
}
public boolean has(String key) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SelectQueryParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SelectQueryParams.java
new file mode 100644
index 000000000..741365060
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SelectQueryParams.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.param.model.FillClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTagsClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.GroupByTimeClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.ItemClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OffsetClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.OrderByClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.SelectClauseParams;
+import org.apache.streampipes.dataexplorer.param.model.WhereClauseParams;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+import java.util.Objects;
+
+public class SelectQueryParams {
+
+ private SelectClauseParams selectParams;
+
+ private WhereClauseParams whereParams;
+
+ private GroupByTagsClauseParams groupByTagsClauseParams;
+ private GroupByTimeClauseParams groupByTimeClauseParams;
+
+ private OrderByClauseParams orderByClauseParams;
+ private ItemClauseParams limitParams;
+
+ private OffsetClauseParams offsetClauseParams;
+
+ private FillClauseParams fillClauseParams;
+
+ private final String index;
+
+ public SelectQueryParams(String index) {
+ this.index = index;
+ }
+
+ public String getIndex() {
+ return index;
+ }
+
+ public void withSelectParams(SelectClauseParams params) {
+ this.selectParams = params;
+ }
+
+ public void withLimitParams(ItemClauseParams params) {
+ this.limitParams = params;
+ }
+
+ public void withGroupByTagsParams(GroupByTagsClauseParams params) {
+ this.groupByTagsClauseParams = params;
+ }
+
+ public void withGroupByTimeParams(GroupByTimeClauseParams params) {
+ this.groupByTimeClauseParams = params;
+ }
+
+ public void withFillParams(FillClauseParams params) {
+ this.fillClauseParams = params;
+ }
+
+ public void withWhereParams(WhereClauseParams params) {
+ this.whereParams = params;
+ }
+
+ public void withOffsetParams(OffsetClauseParams params) {
+ this.offsetClauseParams = params;
+ }
+
+ public void withOrderByParams(OrderByClauseParams params) {
+ this.orderByClauseParams = params;
+ }
+
+ public <T> T toQuery(IDataLakeQueryBuilder<T> builder) {
+ this.selectParams.buildStatement(builder);
+ prepareBuilder(builder);
+ return builder.build();
+ }
+
+ public <T> T toCountQuery(IDataLakeQueryBuilder<T> builder) {
+ this.selectParams.buildCountStatement(builder);
+ prepareBuilder(builder);
+ return builder.build();
+ }
+
+ private <T> void prepareBuilder(IDataLakeQueryBuilder<T> builder) {
+ if (Objects.nonNull(this.whereParams)) {
+ this.whereParams.buildStatement(builder);
+ }
+
+ if (Objects.nonNull(this.groupByTimeClauseParams)) {
+ this.groupByTimeClauseParams.buildStatement(builder);
+ }
+
+ if (Objects.nonNull(this.groupByTagsClauseParams)) {
+ this.groupByTagsClauseParams.buildStatement(builder);
+ }
+
+ if (Objects.nonNull(this.orderByClauseParams)) {
+ this.orderByClauseParams.buildStatement(builder);
+ }
+
+ if (Objects.nonNull(this.limitParams)) {
+ this.limitParams.buildStatement(builder);
+ }
+
+ if (Objects.nonNull(this.offsetClauseParams)) {
+ this.offsetClauseParams.buildStatement(builder);
+ }
+
+ if (Objects.nonNull(this.fillClauseParams)) {
+ this.fillClauseParams.buildStatement(builder);
+ }
+ }
+
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SupportedRestQueryParams.java
similarity index 96%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SupportedRestQueryParams.java
index 059a4f4ce..7c3e3f829 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/SupportedDataLakeQueryParameters.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/SupportedRestQueryParams.java
@@ -15,12 +15,12 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4;
+package org.apache.streampipes.dataexplorer.param;
import java.util.Arrays;
import java.util.List;
-public class SupportedDataLakeQueryParameters {
+public class SupportedRestQueryParams {
public static final String QP_COLUMNS = "columns";
public static final String QP_START_DATE = "startDate";
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ColumnFunction.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/AggregationFunction.java
similarity index 86%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ColumnFunction.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/AggregationFunction.java
index 1837af037..0d899c2b9 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ColumnFunction.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/AggregationFunction.java
@@ -15,9 +15,9 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
-public enum ColumnFunction {
+public enum AggregationFunction {
MEAN("MEAN"),
MIN("MIN"),
@@ -28,9 +28,9 @@ public enum ColumnFunction {
MODE("MODE"),
SUM("SUM");
- private String dbName;
+ private final String dbName;
- ColumnFunction(String dbName) {
+ AggregationFunction(String dbName) {
this.dbName = dbName;
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/FillClauseParams.java
similarity index 62%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/FillClauseParams.java
index 15e93a7e8..5204ce5a0 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/FillStatement.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/FillClauseParams.java
@@ -16,17 +16,23 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query.elements;
+package org.apache.streampipes.dataexplorer.param.model;
-import org.apache.streampipes.dataexplorer.v4.params.FillParams;
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
-public class FillStatement extends QueryElement<FillParams> {
- public FillStatement(FillParams fillParams) {
- super(fillParams);
+public class FillClauseParams implements IQueryStatement {
+ String fill = "none";
+
+ protected FillClauseParams() {
+ }
+
+ public static FillClauseParams from() {
+ return new FillClauseParams();
}
@Override
- protected String buildStatement(FillParams fillParams) {
- return fillParams.getFill();
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ builder.withFill(fill);
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTagsClauseParams.java
similarity index 56%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTagsClauseParams.java
index afdc591a6..833fb3363 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTagsParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTagsClauseParams.java
@@ -16,27 +16,29 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
+
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
-public class GroupingByTagsParams extends QueryParamsV4 {
+public class GroupByTagsClauseParams implements IQueryStatement {
private final List<String> groupingTags;
- public GroupingByTagsParams(String measurementID, String groupingTagsSeparatedByComma) {
- super(measurementID);
+ public GroupByTagsClauseParams(String groupingTagsSeparatedByComma) {
this.groupingTags = new ArrayList<>();
- for (String tag : groupingTagsSeparatedByComma.split(",")) {
- this.groupingTags.add(tag);
- }
+ this.groupingTags.addAll(Arrays.asList(groupingTagsSeparatedByComma.split(",")));
}
- public static GroupingByTagsParams from(String measurementID, String groupingTagsSeparatedByComma) {
- return new GroupingByTagsParams(measurementID, groupingTagsSeparatedByComma);
+ public static GroupByTagsClauseParams from(String groupingTagsSeparatedByComma) {
+ return new GroupByTagsClauseParams(groupingTagsSeparatedByComma);
}
- public List<String> getGroupingTags() {
- return groupingTags;
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ groupingTags.forEach(builder::withGroupBy);
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTimeClauseParams.java
similarity index 61%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTimeClauseParams.java
index 27650dabb..0d83953ce 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/GroupingByTimeParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/GroupByTimeClauseParams.java
@@ -16,21 +16,24 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
-public class GroupingByTimeParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class GroupByTimeClauseParams implements IQueryStatement {
private final String timeInterval;
- public GroupingByTimeParams(String measurementID, String timeInterval) {
- super(measurementID);
+ public GroupByTimeClauseParams(String timeInterval) {
this.timeInterval = timeInterval;
}
- public static GroupingByTimeParams from(String measurementID, String timeInterval) {
- return new GroupingByTimeParams(measurementID, timeInterval);
+ public static GroupByTimeClauseParams from(String timeInterval) {
+ return new GroupByTimeClauseParams(timeInterval);
}
- public String getTimeInterval() {
- return this.timeInterval;
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ builder.withGroupByTime(timeInterval);
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/ItemClauseParams.java
similarity index 64%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/ItemClauseParams.java
index 0bd9dd122..ea447176c 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/ItemLimitationParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/ItemClauseParams.java
@@ -16,22 +16,29 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
-public class ItemLimitationParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class ItemClauseParams implements IQueryStatement {
private final Integer limit;
- public ItemLimitationParams(String measurementID, Integer limit) {
- super(measurementID);
+ public ItemClauseParams(Integer limit) {
this.limit = limit;
}
- public static ItemLimitationParams from(String measurementID, Integer limit) {
- return new ItemLimitationParams(measurementID, limit);
+ public static ItemClauseParams from(Integer limit) {
+ return new ItemClauseParams(limit);
}
public Integer getLimit() {
return limit;
}
+
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ builder.withLimit(limit);
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OffsetClauseParams.java
similarity index 64%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OffsetClauseParams.java
index 334983af1..b8dbc94ae 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OffsetParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OffsetClauseParams.java
@@ -16,22 +16,29 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
-public class OffsetParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class OffsetClauseParams implements IQueryStatement {
private final Integer offset;
- public OffsetParams(String measurementID, Integer offset) {
- super(measurementID);
+ public OffsetClauseParams(Integer offset) {
this.offset = offset;
}
- public static OffsetParams from(String measurementID, Integer offset) {
- return new OffsetParams(measurementID, offset);
+ public static OffsetClauseParams from(Integer offset) {
+ return new OffsetClauseParams(offset);
}
public Integer getOffset() {
return offset;
}
+
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ builder.withOffset(offset);
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OrderByClauseParams.java
similarity index 58%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OrderByClauseParams.java
index 2115e2f44..612a91674 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/OrderingByTimeParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/OrderByClauseParams.java
@@ -16,21 +16,25 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
-public class OrderingByTimeParams extends QueryParamsV4 {
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+public class OrderByClauseParams implements IQueryStatement {
private final String ordering;
- public OrderingByTimeParams(String measurementID, String ordering) {
- super(measurementID);
+ public OrderByClauseParams(String ordering) {
this.ordering = ordering;
}
- public static OrderingByTimeParams from(String measurementID, String ordering) {
- return new OrderingByTimeParams(measurementID, ordering);
+ public static OrderByClauseParams from(String ordering) {
+ return new OrderByClauseParams(ordering);
}
- public String getOrdering() {
- return this.ordering;
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ builder.withOrderBy(DataLakeQueryOrdering.valueOf(ordering));
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectClauseParams.java
similarity index 50%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectClauseParams.java
index ad8666296..6861b4915 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectFromStatementParams.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectClauseParams.java
@@ -16,69 +16,54 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
-public class SelectFromStatementParams extends QueryParamsV4 {
+public class SelectClauseParams implements IQueryStatement {
private List<SelectColumn> selectedColumns;
+ private List<SelectColumn> selectedColumnsCountOnly;
private boolean selectWildcard = false;
- public SelectFromStatementParams(String measurementID) {
- super(measurementID);
+ public SelectClauseParams() {
this.selectWildcard = true;
- //this.selectedColumns = "*";
}
- public SelectFromStatementParams(String measurementId,
- String columns,
- boolean countOnly) {
- this(measurementId);
+ public SelectClauseParams(String columns,
+ boolean countOnly) {
this.selectWildcard = false;
- this.selectedColumns = countOnly ? buildColumns(columns, ColumnFunction.COUNT.name()) : buildColumns(columns);
+ this.selectedColumns = countOnly ? buildColumns(columns, AggregationFunction.COUNT.name()) : buildColumns(columns);
+ this.selectedColumnsCountOnly = buildColumns(columns, AggregationFunction.COUNT.name());
}
- public SelectFromStatementParams(String measurementID,
- String columns,
- String aggregationFunction) {
- super(measurementID);
-
+ public SelectClauseParams(String columns,
+ String aggregationFunction) {
if (columns != null) {
this.selectedColumns =
aggregationFunction != null ? buildColumns(columns, aggregationFunction) : buildColumns(columns);
+ this.selectedColumnsCountOnly = buildColumns(columns, AggregationFunction.COUNT.name());
} else {
this.selectWildcard = true;
}
}
- public static SelectFromStatementParams from(String measurementID,
- @Nullable String columns,
- @Nullable String aggregationFunction) {
- return new SelectFromStatementParams(measurementID, columns, aggregationFunction);
- }
-
- public static SelectFromStatementParams from(String measurementId,
- String columns,
- boolean countOnly) {
- return new SelectFromStatementParams(measurementId, columns, countOnly);
+ public static SelectClauseParams from(@Nullable String columns,
+ @Nullable String aggregationFunction) {
+ return new SelectClauseParams(columns, aggregationFunction);
}
- public List<SelectColumn> getSelectedColumns() {
- return selectedColumns;
- }
-
- //public String getAggregationFunction() {
- //return aggregationFunction;
- //}
-
- public boolean isSelectWildcard() {
- return selectWildcard;
+ public static SelectClauseParams from(String columns,
+ boolean countOnly) {
+ return new SelectClauseParams(columns, countOnly);
}
private List<SelectColumn> buildColumns(String rawQuery) {
@@ -89,4 +74,22 @@ public class SelectFromStatementParams extends QueryParamsV4 {
return Arrays.stream(rawQuery.split(",")).map(qp -> SelectColumn.fromApiQueryString(qp, globalAggregationFunction))
.collect(Collectors.toList());
}
+
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ buildStatement(builder, selectedColumns);
+ }
+
+ public void buildCountStatement(IDataLakeQueryBuilder<?> builder) {
+ buildStatement(builder, selectedColumnsCountOnly);
+ }
+
+ private void buildStatement(IDataLakeQueryBuilder<?> builder,
+ List<SelectColumn> columns) {
+ if (selectWildcard) {
+ builder.withAllColumns();
+ } else {
+ columns.forEach(c -> c.buildStatement(builder));
+ }
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
similarity index 53%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
index 667bf943d..fe7be2da4 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/SelectColumn.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/SelectColumn.java
@@ -15,14 +15,16 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param.model;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
-public class SelectColumn {
+public class SelectColumn implements IQueryStatement {
private final String originalField;
- private ColumnFunction columnFunction;
+ private AggregationFunction aggregationFunction;
private String targetField;
private boolean simpleField = true;
@@ -33,30 +35,30 @@ public class SelectColumn {
}
public SelectColumn(String originalField,
- ColumnFunction columnFunction) {
+ AggregationFunction aggregationFunction) {
this(originalField);
- this.columnFunction = columnFunction;
+ this.aggregationFunction = aggregationFunction;
this.simpleField = false;
}
public SelectColumn(String originalField,
- ColumnFunction columnFunction,
+ AggregationFunction aggregationFunction,
String targetField) {
- this(originalField, columnFunction);
+ this(originalField, aggregationFunction);
this.targetField = targetField;
this.rename = true;
}
public static SelectColumn fromApiQueryString(String queryString) {
if (queryString.contains(";")) {
- String[] queryParts = DataLakeManagementUtils.buildSingleCondition(queryString);
+ String[] queryParts = ProvidedRestQueryParamConverter.buildSingleCondition(queryString);
if (queryParts.length < 2) {
throw new IllegalArgumentException("Wrong query format for query part " + queryString);
} else {
- ColumnFunction columnFunction = ColumnFunction.valueOf(queryParts[1]);
+ AggregationFunction aggregationFunction = AggregationFunction.valueOf(queryParts[1]);
String targetField =
- queryParts.length == 3 ? queryParts[2] : columnFunction.name().toLowerCase() + "_" + queryParts[0];
- return new SelectColumn(queryParts[0], columnFunction, targetField);
+ queryParts.length == 3 ? queryParts[2] : aggregationFunction.name().toLowerCase() + "_" + queryParts[0];
+ return new SelectColumn(queryParts[0], aggregationFunction, targetField);
}
} else {
return new SelectColumn(queryString);
@@ -65,29 +67,25 @@ public class SelectColumn {
public static SelectColumn fromApiQueryString(String queryString,
String globalAggregationFunction) {
- ColumnFunction columnFunction = ColumnFunction.valueOf(globalAggregationFunction);
- String targetField = columnFunction.name().toLowerCase() + "_" + queryString;
- return new SelectColumn(queryString, ColumnFunction.valueOf(globalAggregationFunction), targetField);
+ AggregationFunction aggregationFunction = AggregationFunction.valueOf(globalAggregationFunction);
+ String targetField = aggregationFunction.name().toLowerCase() + "_" + queryString;
+ return new SelectColumn(queryString, AggregationFunction.valueOf(globalAggregationFunction), targetField);
}
- private String makeField() {
- if (this.simpleField) {
- return "\"" + this.originalField + "\"";
- } else {
- return this.columnFunction.toDbName() + "(\"" + this.originalField + "\")";
- }
+ public String getOriginalField() {
+ return originalField;
}
- public String toQueryString() {
- String field = makeField();
- if (this.rename) {
- return field + " AS \"" + this.targetField + "\"";
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ if (this.simpleField) {
+ builder.withSimpleColumn(this.originalField);
} else {
- return field;
+ if (this.rename) {
+ builder.withAggregatedColumn(this.originalField, this.aggregationFunction, this.targetField);
+ } else {
+ builder.withAggregatedColumn(this.originalField, this.aggregationFunction);
+ }
}
}
-
- public String getOriginalField() {
- return originalField;
- }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/WhereClauseParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/WhereClauseParams.java
new file mode 100644
index 000000000..ab5a6712b
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/param/model/WhereClauseParams.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.dataexplorer.param.model;
+
+import org.apache.streampipes.dataexplorer.api.IQueryStatement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.querybuilder.FilterCondition;
+import org.apache.streampipes.dataexplorer.querybuilder.IDataLakeQueryBuilder;
+
+import org.apache.commons.lang3.math.NumberUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class WhereClauseParams implements IQueryStatement {
+
+ private static final String GT = ">";
+ private static final String LT = "<";
+
+ private final List<FilterCondition> filterConditions;
+
+ private WhereClauseParams(Long startTime,
+ Long endTime,
+ String whereConditions) {
+ this(startTime, endTime);
+ if (whereConditions != null) {
+ buildConditions(whereConditions);
+ }
+ }
+
+ private WhereClauseParams(Long startTime,
+ Long endTime) {
+ this.filterConditions = new ArrayList<>();
+ this.buildTimeConditions(startTime, endTime);
+ }
+
+ private WhereClauseParams(String whereConditions) {
+ this.filterConditions = new ArrayList<>();
+ if (whereConditions != null) {
+ buildConditions(whereConditions);
+ }
+ }
+
+ public static WhereClauseParams from(Long startTime,
+ Long endTime) {
+ return new WhereClauseParams(startTime, endTime);
+ }
+
+ public static WhereClauseParams from(String whereConditions) {
+ return new WhereClauseParams(whereConditions);
+ }
+
+ public static WhereClauseParams from(Long startTime,
+ Long endTime,
+ String whereConditions) {
+ return new WhereClauseParams(startTime, endTime, whereConditions);
+ }
+
+ private void buildTimeConditions(Long startTime,
+ Long endTime) {
+ if (startTime == null) {
+ this.filterConditions.add(buildTimeBoundary(endTime, LT));
+ } else if (endTime == null) {
+ this.filterConditions.add(buildTimeBoundary(startTime, GT));
+ } else {
+ this.filterConditions.add(buildTimeBoundary(endTime, LT));
+ this.filterConditions.add(buildTimeBoundary(startTime, GT));
+ }
+ }
+
+ private FilterCondition buildTimeBoundary(Long time, String operator) {
+ return new FilterCondition("time", operator, time * 1000000);
+ }
+
+ private void buildConditions(String whereConditions) {
+ List<String[]> whereParts = ProvidedRestQueryParamConverter.buildConditions(whereConditions);
+ // Add single quotes to strings except for true and false
+ whereParts.forEach(singleCondition -> {
+
+ this.filterConditions.add(
+ new FilterCondition(singleCondition[0], singleCondition[1], this.returnCondition(singleCondition[2])));
+ });
+ }
+
+ private Object returnCondition(String inputCondition) {
+ if (NumberUtils.isCreatable(inputCondition)) {
+ return Double.parseDouble(inputCondition);
+ } else if (isBoolean(inputCondition)) {
+ return Boolean.parseBoolean(inputCondition);
+ } else {
+ return inputCondition;
+ }
+ }
+
+ private boolean isBoolean(String input) {
+ return "true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input);
+ }
+
+ public List<FilterCondition> getWhereConditions() {
+ return filterConditions;
+ }
+
+ @Override
+ public void buildStatement(IDataLakeQueryBuilder<?> builder) {
+ builder.withInclusiveFilter(filterConditions);
+ }
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java
similarity index 68%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java
index a63c98077..1bff44a1d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/AutoAggregationHandler.java
@@ -15,11 +15,14 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4;
-
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
-import org.apache.streampipes.dataexplorer.sdk.DataLakeQueryOrdering;
-import org.apache.streampipes.dataexplorer.v4.params.SelectColumn;
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.model.SelectColumn;
+import org.apache.streampipes.dataexplorer.querybuilder.DataLakeQueryOrdering;
import org.apache.streampipes.model.datalake.SpQueryResult;
import org.slf4j.Logger;
@@ -32,13 +35,13 @@ import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_TIME_INTERVAL;
public class AutoAggregationHandler {
@@ -50,15 +53,19 @@ public class AutoAggregationHandler {
private final SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- private final DataLakeManagementV4 dataLakeManagement;
- private final ProvidedQueryParams queryParams;
+ private final IDataExplorerQueryManagement dataLakeQueryManagement;
+ private final ProvidedRestQueryParams queryParams;
- public AutoAggregationHandler(ProvidedQueryParams params) {
+ public AutoAggregationHandler(ProvidedRestQueryParams params) {
this.queryParams = params;
- this.dataLakeManagement = new DataLakeManagementV4();
+ this.dataLakeQueryManagement = getDataLakeQueryManagement();
+ }
+
+ private IDataExplorerQueryManagement getDataLakeQueryManagement() {
+ return new DataExplorerQueryManagement(new DataExplorerSchemaManagement());
}
- public ProvidedQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
+ public ProvidedRestQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
try {
SpQueryResult newest = getSingleRecord(DataLakeQueryOrdering.DESC);
SpQueryResult oldest = getSingleRecord(DataLakeQueryOrdering.ASC);
@@ -85,25 +92,25 @@ public class AutoAggregationHandler {
return null;
}
- private ProvidedQueryParams disableAutoAgg(ProvidedQueryParams params) {
+ private ProvidedRestQueryParams disableAutoAgg(ProvidedRestQueryParams params) {
params.remove(QP_AUTO_AGGREGATE);
return params;
}
public Integer getCount(String fieldName) {
- ProvidedQueryParams countParams = disableAutoAgg(new ProvidedQueryParams(queryParams));
+ ProvidedRestQueryParams countParams = disableAutoAgg(new ProvidedRestQueryParams(queryParams));
countParams.remove(QP_TIME_INTERVAL);
countParams.remove(QP_AGGREGATION_FUNCTION);
countParams.update(QP_COUNT_ONLY, true);
countParams.update(QP_COLUMNS, fieldName);
- SpQueryResult result = new DataLakeManagementV4().getData(countParams, true);
+ SpQueryResult result = dataLakeQueryManagement.getData(countParams, true);
return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0;
}
- private SpQueryResult fireQuery(ProvidedQueryParams params) {
- return dataLakeManagement.getData(params, true);
+ private SpQueryResult fireQuery(ProvidedRestQueryParams params) {
+ return dataLakeQueryManagement.getData(params, true);
}
private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException {
@@ -113,7 +120,7 @@ public class AutoAggregationHandler {
}
private SpQueryResult getSingleRecord(DataLakeQueryOrdering order) throws ParseException {
- ProvidedQueryParams singleEvent = disableAutoAgg(new ProvidedQueryParams(queryParams));
+ ProvidedRestQueryParams singleEvent = disableAutoAgg(new ProvidedRestQueryParams(queryParams));
singleEvent.remove(QP_AGGREGATION_FUNCTION);
singleEvent.update(QP_LIMIT, 1);
singleEvent.update(QP_ORDER, order.name());
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
new file mode 100644
index 000000000..769fc042a
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/DataExplorerQueryExecutor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.query;
+
+import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.param.DeleteQueryParams;
+import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.datalake.SpQueryStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class DataExplorerQueryExecutor<X, W> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataExplorerInfluxQueryExecutor.class);
+ protected int maximumAmountOfEvents;
+
+ protected boolean appendId = false;
+ protected String forId;
+
+ public DataExplorerQueryExecutor() {
+ this.maximumAmountOfEvents = -1;
+ }
+
+ public DataExplorerQueryExecutor(String forId) {
+ this();
+ this.appendId = true;
+ this.forId = forId;
+ }
+
+ public DataExplorerQueryExecutor(int maximumAmountOfEvents) {
+ this();
+ this.maximumAmountOfEvents = maximumAmountOfEvents;
+ }
+
+ public SpQueryResult executeQuery(SelectQueryParams params,
+ boolean ignoreMissingValues) throws RuntimeException {
+
+ if (this.maximumAmountOfEvents != -1) {
+ X countQuery = makeCountQuery(params);
+ W countQueryResult = executeQuery(countQuery);
+ Double amountOfQueryResults = getAmountOfResults(countQueryResult);
+
+ if (amountOfQueryResults > this.maximumAmountOfEvents) {
+ SpQueryResult tooMuchData = new SpQueryResult();
+ tooMuchData.setSpQueryStatus(SpQueryStatus.TOO_MUCH_DATA);
+ tooMuchData.setTotal(amountOfQueryResults.intValue());
+ return tooMuchData;
+ }
+ }
+
+ X query = makeSelectQuery(params);
+ return executeQuery(query, ignoreMissingValues);
+ }
+
+ public SpQueryResult executeQuery(DeleteQueryParams params) {
+ return executeQuery(makeDeleteQuery(params), true);
+ }
+
+ public SpQueryResult executeQuery(X query,
+ boolean ignoreMissingValues) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Data Lake Query " + asQueryString(query));
+ }
+
+ W result = executeQuery(query);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Data Lake Query Result: " + result.toString());
+ }
+
+ return postQuery(result, ignoreMissingValues);
+ }
+
+ protected abstract double getAmountOfResults(W countQueryResult);
+
+ protected abstract SpQueryResult postQuery(W queryResult,
+ boolean ignoreMissingValues);
+
+ protected abstract W executeQuery(X query);
+
+ protected abstract String asQueryString(X query);
+
+ protected abstract X makeDeleteQuery(DeleteQueryParams params);
+
+ protected abstract X makeCountQuery(SelectQueryParams params);
+
+ protected abstract X makeSelectQuery(SelectQueryParams params);
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/QueryResultProvider.java
similarity index 61%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/QueryResultProvider.java
index d6a7b9d18..ce88fd1b5 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/QueryResultProvider.java
@@ -16,26 +16,24 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query;
+package org.apache.streampipes.dataexplorer.query;
-import org.apache.streampipes.dataexplorer.v4.AutoAggregationHandler;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
+import org.apache.streampipes.dataexplorer.influx.DataExplorerInfluxQueryExecutor;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParamConverter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.SelectQueryParams;
import org.apache.streampipes.model.datalake.SpQueryResult;
-import java.util.Map;
-
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS;
public class QueryResultProvider {
public static final String FOR_ID_KEY = "forId";
protected final boolean ignoreMissingData;
- protected ProvidedQueryParams queryParams;
+ protected ProvidedRestQueryParams queryParams;
- public QueryResultProvider(ProvidedQueryParams queryParams,
+ public QueryResultProvider(ProvidedRestQueryParams queryParams,
boolean ignoreMissingData) {
this.queryParams = queryParams;
this.ignoreMissingData = ignoreMissingData;
@@ -45,18 +43,18 @@ public class QueryResultProvider {
if (queryParams.has(QP_AUTO_AGGREGATE)) {
queryParams = new AutoAggregationHandler(queryParams).makeAutoAggregationQueryParams();
}
- Map<String, QueryParamsV4> queryParts = DataLakeManagementUtils.getSelectQueryParams(queryParams);
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(queryParams);
if (queryParams.getProvidedParams().containsKey(QP_MAXIMUM_AMOUNT_OF_EVENTS)) {
int maximumAmountOfEvents = Integer.parseInt(queryParams.getProvidedParams().get(QP_MAXIMUM_AMOUNT_OF_EVENTS));
- return new DataExplorerQueryV4(queryParts, maximumAmountOfEvents).executeQuery(ignoreMissingData);
+ return new DataExplorerInfluxQueryExecutor(maximumAmountOfEvents).executeQuery(qp, ignoreMissingData);
}
if (queryParams.getProvidedParams().containsKey(FOR_ID_KEY)) {
String forWidgetId = queryParams.getProvidedParams().get(FOR_ID_KEY);
- return new DataExplorerQueryV4(queryParts, forWidgetId).executeQuery(ignoreMissingData);
+ return new DataExplorerInfluxQueryExecutor(forWidgetId).executeQuery(qp, ignoreMissingData);
} else {
- return new DataExplorerQueryV4(queryParts).executeQuery(ignoreMissingData);
+ return new DataExplorerInfluxQueryExecutor().executeQuery(qp, ignoreMissingData);
}
}
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
similarity index 84%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
index 6e1efd3cb..ee356e936 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/StreamedQueryResultProvider.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/StreamedQueryResultProvider.java
@@ -16,13 +16,13 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query;
+package org.apache.streampipes.dataexplorer.query;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.ConfiguredOutputWriter;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters;
-import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredOutputWriter;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;
@@ -31,8 +31,8 @@ import java.io.OutputStream;
import java.util.List;
import java.util.Optional;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
public class StreamedQueryResultProvider extends QueryResultProvider {
@@ -41,7 +41,7 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
private final OutputFormat format;
- public StreamedQueryResultProvider(ProvidedQueryParams params,
+ public StreamedQueryResultProvider(ProvidedRestQueryParams params,
OutputFormat format,
boolean ignoreMissingValues) {
super(params, ignoreMissingValues);
@@ -69,7 +69,7 @@ public class StreamedQueryResultProvider extends QueryResultProvider {
boolean isFirstDataItem = true;
configuredWriter.beforeFirstItem(outputStream);
do {
- queryParams.update(SupportedDataLakeQueryParameters.QP_PAGE, String.valueOf(page));
+ queryParams.update(SupportedRestQueryParams.QP_PAGE, String.valueOf(page));
dataResult = getData();
if (dataResult.getTotal() > 0) {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredCsvOutputWriter.java
similarity index 86%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredCsvOutputWriter.java
index 8d92feb61..03ca75530 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredCsvOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredCsvOutputWriter.java
@@ -16,17 +16,17 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.item.CsvItemWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.StringJoiner;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_CSV_DELIMITER;
public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
@@ -38,7 +38,7 @@ public class ConfiguredCsvOutputWriter extends ConfiguredOutputWriter {
private String delimiter = COMMA;
@Override
- public void configure(ProvidedQueryParams params,
+ public void configure(ProvidedRestQueryParams params,
boolean ignoreMissingValues) {
if (params.has(QP_CSV_DELIMITER)) {
delimiter = params.getAsString(QP_CSV_DELIMITER).equals("comma") ? COMMA : SEMICOLON;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredJsonOutputWriter.java
similarity index 85%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredJsonOutputWriter.java
index b2070e2c1..0781c7334 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredJsonOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredJsonOutputWriter.java
@@ -16,11 +16,11 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.ItemGenerator;
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.item.ItemGenerator;
+import org.apache.streampipes.dataexplorer.query.writer.item.JsonItemWriter;
import com.google.gson.Gson;
@@ -41,7 +41,7 @@ public class ConfiguredJsonOutputWriter extends ConfiguredOutputWriter {
}
@Override
- public void configure(ProvidedQueryParams params,
+ public void configure(ProvidedRestQueryParams params,
boolean ignoreMissingValues) {
// do nothing
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredOutputWriter.java
similarity index 89%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredOutputWriter.java
index 6ef855c41..ed60687ae 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/ConfiguredOutputWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/ConfiguredOutputWriter.java
@@ -16,9 +16,9 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
import java.io.IOException;
import java.io.OutputStream;
@@ -27,7 +27,7 @@ import java.util.List;
public abstract class ConfiguredOutputWriter {
public static ConfiguredOutputWriter getConfiguredWriter(OutputFormat format,
- ProvidedQueryParams params,
+ ProvidedRestQueryParams params,
boolean ignoreMissingValues) {
var writer = format.getWriter();
writer.configure(params, ignoreMissingValues);
@@ -35,7 +35,7 @@ public abstract class ConfiguredOutputWriter {
return writer;
}
- public abstract void configure(ProvidedQueryParams params,
+ public abstract void configure(ProvidedRestQueryParams params,
boolean ignoreMissingValues);
public abstract void beforeFirstItem(OutputStream outputStream) throws IOException;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/OutputFormat.java
similarity index 95%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/OutputFormat.java
index 5a522b403..c855bf1c8 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/OutputFormat.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/OutputFormat.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
import java.util.function.Supplier;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/CsvItemWriter.java
similarity index 94%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/CsvItemWriter.java
index 5fe06fb52..bd80f2d9a 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/CsvItemWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/CsvItemWriter.java
@@ -17,7 +17,7 @@
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
public class CsvItemWriter extends ItemGenerator {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
similarity index 92%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
index 056192518..7b3aa447f 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/ItemGenerator.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/ItemGenerator.java
@@ -17,9 +17,9 @@
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
-import org.apache.streampipes.dataexplorer.v4.utils.TimeParser;
+import org.apache.streampipes.dataexplorer.utils.TimeParser;
import java.util.List;
import java.util.StringJoiner;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/JsonItemWriter.java
similarity index 95%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/JsonItemWriter.java
index c4b8d4525..fde43721d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/writer/item/JsonItemWriter.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/query/writer/item/JsonItemWriter.java
@@ -17,7 +17,7 @@
*/
-package org.apache.streampipes.dataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
import com.google.gson.Gson;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/DataLakeQueryOrdering.java
similarity index 93%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/DataLakeQueryOrdering.java
index e8734e78e..f8825c157 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryOrdering.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/DataLakeQueryOrdering.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.dataexplorer.sdk;
+package org.apache.streampipes.dataexplorer.querybuilder;
public enum DataLakeQueryOrdering {
ASC, DESC
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/FilterCondition.java
similarity index 70%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/FilterCondition.java
index b222eb335..396bed21d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereCondition.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/FilterCondition.java
@@ -15,33 +15,20 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.querybuilder;
-import java.util.StringJoiner;
-
-public class WhereCondition {
+public class FilterCondition {
private String field;
private String operator;
- private String condition;
+ private Object condition;
- public WhereCondition(String field, String operator, String condition) {
+ public FilterCondition(String field, String operator, Object condition) {
this.field = field;
this.operator = operator;
this.condition = condition;
}
- @Override
- public String toString() {
- StringJoiner joiner = new StringJoiner(" ");
-
- return joiner
- .add(field)
- .add(operator)
- .add(condition)
- .toString();
- }
-
public String getField() {
return field;
}
@@ -50,7 +37,8 @@ public class WhereCondition {
return operator;
}
- public String getCondition() {
+ public Object getCondition() {
return condition;
}
+
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/IDataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/IDataLakeQueryBuilder.java
new file mode 100644
index 000000000..065c18562
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/querybuilder/IDataLakeQueryBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.querybuilder;
+
+import org.apache.streampipes.dataexplorer.param.model.AggregationFunction;
+
+import org.influxdb.querybuilder.clauses.NestedClause;
+
+import java.util.List;
+
+public interface IDataLakeQueryBuilder<T> {
+
+ IDataLakeQueryBuilder<T> withAllColumns();
+
+ IDataLakeQueryBuilder<T> withSimpleColumn(String columnName);
+
+ IDataLakeQueryBuilder<T> withSimpleColumns(List<String> columnNames);
+
+ IDataLakeQueryBuilder<T> withAggregatedColumn(String columnName,
+ AggregationFunction aggregationFunction,
+ String targetName);
+
+ IDataLakeQueryBuilder<T> withAggregatedColumn(String columnName,
+ AggregationFunction aggregationFunction);
+
+ IDataLakeQueryBuilder<T> withStartTime(long startTime);
+
+ IDataLakeQueryBuilder<T> withEndTime(long endTime);
+
+ IDataLakeQueryBuilder<T> withEndTime(long endTime,
+ boolean includeEndTime);
+
+ IDataLakeQueryBuilder<T> withTimeBoundary(long startTime,
+ long endTime);
+
+ IDataLakeQueryBuilder<T> withFilter(String field,
+ String operator,
+ Object value);
+
+ IDataLakeQueryBuilder<T> withExclusiveFilter(String field,
+ String operator,
+ List<?> values);
+
+ IDataLakeQueryBuilder<T> withInclusiveFilter(String field,
+ String operator,
+ List<?> values);
+
+ IDataLakeQueryBuilder<T> withInclusiveFilter(List<FilterCondition> filterConditions);
+
+ IDataLakeQueryBuilder<T> withFilter(NestedClause clause);
+
+ IDataLakeQueryBuilder<T> withGroupByTime(String timeInterval);
+
+ IDataLakeQueryBuilder<T> withGroupByTime(String timeInterval,
+ String offsetInterval);
+
+ IDataLakeQueryBuilder<T> withGroupBy(String column);
+
+ IDataLakeQueryBuilder<T> withOrderBy(DataLakeQueryOrdering ordering);
+
+ IDataLakeQueryBuilder<T> withLimit(int limit);
+
+ IDataLakeQueryBuilder<T> withOffset(int offset);
+
+ IDataLakeQueryBuilder<T> withFill(Object fill);
+
+ T build();
+}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryConstants.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryConstants.java
deleted file mode 100644
index f6664f45a..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryConstants.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.sdk;
-
-public class DataLakeQueryConstants {
-
- public static final String GE = ">=";
- public static final String LE = "<=";
- public static final String LT = "<";
- public static final String GT = ">";
- public static final String EQ = "=";
- public static final String NEQ = "!=";
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/IDataLakeQueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/IDataLakeQueryBuilder.java
deleted file mode 100644
index 2bc70df16..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/sdk/IDataLakeQueryBuilder.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.sdk;
-
-import org.apache.streampipes.dataexplorer.v4.params.ColumnFunction;
-
-import org.influxdb.querybuilder.clauses.NestedClause;
-
-import java.util.List;
-
-public interface IDataLakeQueryBuilder<T> {
- IDataLakeQueryBuilder withSimpleColumn(String columnName);
-
- IDataLakeQueryBuilder withSimpleColumns(List<String> columnNames);
-
- IDataLakeQueryBuilder withAggregatedColumn(String columnName,
- ColumnFunction columnFunction,
- String targetName);
-
- IDataLakeQueryBuilder withStartTime(long startTime);
-
- IDataLakeQueryBuilder withEndTime(long endTime);
-
- IDataLakeQueryBuilder withEndTime(long endTime,
- boolean includeEndTime);
-
- IDataLakeQueryBuilder withTimeBoundary(long startTime,
- long endTime);
-
- IDataLakeQueryBuilder withFilter(String field,
- String operator,
- Object value);
-
- IDataLakeQueryBuilder withExclusiveFilter(String field,
- String operator,
- List<?> values);
-
- IDataLakeQueryBuilder withInclusiveFilter(String field,
- String operator,
- List<?> values);
-
- IDataLakeQueryBuilder withFilter(NestedClause clause);
-
- IDataLakeQueryBuilder withGroupByTime(String timeInterval);
-
- IDataLakeQueryBuilder withGroupByTime(String timeInterval,
- String offsetInterval);
-
- IDataLakeQueryBuilder withGroupBy(String column);
-
- IDataLakeQueryBuilder withOrderBy(DataLakeQueryOrdering ordering);
-
- IDataLakeQueryBuilder withLimit(int limit);
-
- IDataLakeQueryBuilder withOffset(int offset);
-
- T build();
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
similarity index 97%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
index 53412edb8..135a5d89d 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/TimeParser.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/utils/TimeParser.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.utils;
+package org.apache.streampipes.dataexplorer.utils;
import java.time.Instant;
import java.time.LocalDate;
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
deleted file mode 100644
index c3cc0392c..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/DeleteFromStatementParams.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.params;
-
-public class DeleteFromStatementParams extends QueryParamsV4 {
-
- public DeleteFromStatementParams(String measurementID) {
- super(measurementID);
- }
-
- public static DeleteFromStatementParams from(String measurementID) {
- return new DeleteFromStatementParams(measurementID);
- }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
deleted file mode 100644
index 0831e6d5b..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/FillParams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.params;
-
-public class FillParams extends QueryParamsV4 {
- String fill = "fill(none)";
-
- protected FillParams(String index) {
- super(index);
- }
-
- public static FillParams from(String measurementID) {
- return new FillParams(measurementID);
- }
-
- public String getFill() {
- return fill;
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
deleted file mode 100644
index 0c7f1829c..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParams.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.v4.params;
-
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
-
-import org.apache.commons.lang3.math.NumberUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class WhereStatementParams extends QueryParamsV4 {
-
- private static final String GT = ">";
- private static final String LT = "<";
-
- private List<WhereCondition> whereConditions;
-
- private WhereStatementParams(String index,
- Long startTime,
- Long endTime,
- String whereConditions) {
- this(index, startTime, endTime);
- if (whereConditions != null) {
- buildConditions(whereConditions);
- }
- }
-
- private WhereStatementParams(String index,
- Long startTime,
- Long endTime) {
- super(index);
- this.whereConditions = new ArrayList<>();
- this.buildTimeConditions(startTime, endTime);
- }
-
- private WhereStatementParams(String index,
- String whereConditions) {
- super(index);
- this.whereConditions = new ArrayList<>();
- if (whereConditions != null) {
- buildConditions(whereConditions);
- }
- }
-
- public static WhereStatementParams from(String measurementId,
- Long startTime,
- Long endTime) {
- return new WhereStatementParams(measurementId, startTime, endTime);
- }
-
- public static WhereStatementParams from(String measurementId,
- String whereConditions) {
- return new WhereStatementParams(measurementId, whereConditions);
- }
-
- public static WhereStatementParams from(String measurementId,
- Long startTime,
- Long endTime,
- String whereConditions) {
- return new WhereStatementParams(measurementId, startTime, endTime, whereConditions);
- }
-
- private void buildTimeConditions(Long startTime,
- Long endTime) {
- if (startTime == null) {
- this.whereConditions.add(buildTimeBoundary(endTime, LT));
- } else if (endTime == null) {
- this.whereConditions.add(buildTimeBoundary(startTime, GT));
- } else {
- this.whereConditions.add(buildTimeBoundary(endTime, LT));
- this.whereConditions.add(buildTimeBoundary(startTime, GT));
- }
- }
-
- private WhereCondition buildTimeBoundary(Long time, String operator) {
- return new WhereCondition("time", operator, String.valueOf(time * 1000000));
- }
-
- private void buildConditions(String whereConditions) {
- List<String[]> whereParts = DataLakeManagementUtils.buildConditions(whereConditions);
- // Add single quotes to strings except for true and false
- whereParts.forEach(singleCondition -> {
-
- this.whereConditions.add(
- new WhereCondition(singleCondition[0], singleCondition[1], this.returnCondition(singleCondition[2])));
- });
- }
-
- private String returnCondition(String inputCondition) {
- if (NumberUtils.isCreatable(inputCondition) || isBoolean(inputCondition)) {
- return inputCondition;
- } else if (inputCondition.equals("\"\"")) {
- return inputCondition;
- } else {
- return "'" + inputCondition + "'";
- }
- }
-
- private boolean isBoolean(String input) {
- return "true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input);
- }
-
- public List<WhereCondition> getWhereConditions() {
- return whereConditions;
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
deleted file mode 100644
index 22add8267..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/DataExplorerQueryV4.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query;
-
-import org.apache.streampipes.commons.environment.Environment;
-import org.apache.streampipes.commons.environment.Environments;
-import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.FillParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
-import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
-import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
-import org.apache.streampipes.dataexplorer.v4.query.elements.DeleteFromStatement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.FillStatement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.GroupingByTags;
-import org.apache.streampipes.dataexplorer.v4.query.elements.GroupingByTime;
-import org.apache.streampipes.dataexplorer.v4.query.elements.ItemLimitation;
-import org.apache.streampipes.dataexplorer.v4.query.elements.Offset;
-import org.apache.streampipes.dataexplorer.v4.query.elements.OrderingByTime;
-import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.SelectFromStatement;
-import org.apache.streampipes.dataexplorer.v4.query.elements.WhereStatement;
-import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
-import org.apache.streampipes.model.datalake.DataSeries;
-import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.model.datalake.SpQueryStatus;
-
-import org.influxdb.InfluxDB;
-import org.influxdb.dto.Query;
-import org.influxdb.dto.QueryResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class DataExplorerQueryV4 {
-
- private static final Logger LOG = LoggerFactory.getLogger(DataExplorerQueryV4.class);
-
- protected Map<String, QueryParamsV4> params;
-
- protected int maximumAmountOfEvents;
-
- private boolean appendId = false;
- private String forId;
-
- private Environment env;
-
- public DataExplorerQueryV4() {
-
- }
-
- public DataExplorerQueryV4(Map<String, QueryParamsV4> params,
- String forId) {
- this(params);
- this.appendId = true;
- this.forId = forId;
- }
-
- public DataExplorerQueryV4(Map<String, QueryParamsV4> params) {
- this.params = params;
- this.env = Environments.getEnvironment();
- this.maximumAmountOfEvents = -1;
- }
-
- public DataExplorerQueryV4(Map<String, QueryParamsV4> params, int maximumAmountOfEvents) {
- this(params);
- this.maximumAmountOfEvents = maximumAmountOfEvents;
- }
-
- public SpQueryResult executeQuery(boolean ignoreMissingValues) throws RuntimeException {
- try (final InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient()) {
- List<QueryElement<?>> queryElements = getQueryElements();
-
- if (this.maximumAmountOfEvents != -1) {
- QueryBuilder countQueryBuilder = QueryBuilder.create(getDatabaseName());
- Query countQuery = countQueryBuilder.build(queryElements, true);
- QueryResult countQueryResult = influxDB.query(countQuery);
- Double amountOfQueryResults = getAmountOfResults(countQueryResult);
-
- if (amountOfQueryResults > this.maximumAmountOfEvents) {
- SpQueryResult tooMuchData = new SpQueryResult();
- tooMuchData.setSpQueryStatus(SpQueryStatus.TOO_MUCH_DATA);
- tooMuchData.setTotal(amountOfQueryResults.intValue());
- return tooMuchData;
- }
- }
-
- QueryBuilder queryBuilder = QueryBuilder.create(getDatabaseName());
- Query query = queryBuilder.build(queryElements, false);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Data Lake Query (database:" + query.getDatabase() + "): " + query.getCommand());
- }
-
- QueryResult result = influxDB.query(query);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Data Lake Query Result: " + result.toString());
- }
-
- return postQuery(result, ignoreMissingValues);
- }
- }
-
- private double getAmountOfResults(QueryResult countQueryResult) {
- if (countQueryResult.getResults().get(0).getSeries() != null
- && countQueryResult.getResults().get(0).getSeries().get(0).getValues() != null) {
- return (double) countQueryResult.getResults().get(0).getSeries().get(0).getValues().get(0).get(1);
- } else {
- return 0.0;
- }
- }
-
-
- protected DataSeries convertResult(QueryResult.Series series,
- boolean ignoreMissingValues) {
- List<String> columns = series.getColumns();
- List<List<Object>> values = series.getValues();
-
- List<List<Object>> resultingValues = new ArrayList<>();
-
- values.forEach(v -> {
- if (ignoreMissingValues) {
- if (!v.contains(null)) {
- resultingValues.add(v);
- }
- } else {
- resultingValues.add(v);
- }
-
- });
-
- return new DataSeries(values.size(), resultingValues, columns, series.getTags());
- }
-
- protected SpQueryResult postQuery(QueryResult queryResult,
- boolean ignoreMissingValues) throws RuntimeException {
- SpQueryResult result = new SpQueryResult();
-
- if (hasResult(queryResult)) {
- result.setTotal(queryResult.getResults().get(0).getSeries().size());
- queryResult.getResults().get(0).getSeries().forEach(rs -> {
- DataSeries series = convertResult(rs, ignoreMissingValues);
- result.setHeaders(series.getHeaders());
- result.addDataResult(series);
- });
- }
-
- if (this.appendId) {
- result.setForId(this.forId);
- }
-
- return result;
- }
-
- private boolean hasResult(QueryResult queryResult) {
- return queryResult.getResults() != null
- && queryResult.getResults().size() > 0
- && queryResult.getResults().get(0).getSeries() != null;
- }
-
- protected List<QueryElement<?>> getQueryElements() {
- List<QueryElement<?>> queryElements = new ArrayList<>();
-
- if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
- queryElements.add(
- new SelectFromStatement((SelectFromStatementParams) this.params.get(DataLakeManagementUtils.SELECT_FROM)));
- } else {
- queryElements.add(
- new DeleteFromStatement((DeleteFromStatementParams) this.params.get(DataLakeManagementUtils.DELETE_FROM)));
- }
-
- if (this.params.containsKey(DataLakeManagementUtils.WHERE)) {
- queryElements.add(new WhereStatement((WhereStatementParams) this.params.get(DataLakeManagementUtils.WHERE)));
- }
-
- if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TIME)) {
- queryElements.add(
- new GroupingByTime((GroupingByTimeParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TIME)));
-
- } else if (this.params.containsKey(DataLakeManagementUtils.GROUP_BY_TAGS)) {
- queryElements.add(
- new GroupingByTags((GroupingByTagsParams) this.params.get(DataLakeManagementUtils.GROUP_BY_TAGS)));
- }
-
- if (this.params.containsKey(DataLakeManagementUtils.FILL)) {
- queryElements.add(new FillStatement((FillParams) this.params.get(DataLakeManagementUtils.FILL)));
- }
-
- if (this.params.containsKey(DataLakeManagementUtils.ORDER_DESCENDING)) {
- queryElements.add(
- new OrderingByTime((OrderingByTimeParams) this.params.get(DataLakeManagementUtils.ORDER_DESCENDING)));
- } else if (this.params.containsKey(DataLakeManagementUtils.SELECT_FROM)) {
- queryElements.add(new OrderingByTime(
- OrderingByTimeParams.from(this.params.get(DataLakeManagementUtils.SELECT_FROM).getIndex(), "ASC")));
- }
-
- if (this.params.containsKey(DataLakeManagementUtils.LIMIT)) {
- queryElements.add(new ItemLimitation((ItemLimitationParams) this.params.get(DataLakeManagementUtils.LIMIT)));
- }
-
- if (this.params.containsKey(DataLakeManagementUtils.OFFSET)) {
- queryElements.add(new Offset((OffsetParams) this.params.get(DataLakeManagementUtils.OFFSET)));
- }
-
- return queryElements;
- }
-
- private String getDatabaseName() {
- return env.getTsStorageBucket().getValueOrDefault();
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
deleted file mode 100644
index a4a09d70e..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/QueryBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query;
-
-import org.apache.streampipes.dataexplorer.v4.query.elements.QueryElement;
-
-import org.influxdb.dto.Query;
-
-import java.util.List;
-import java.util.StringJoiner;
-
-public class QueryBuilder {
-
- private final StringJoiner queryParts;
- private final String databaseName;
-
- private QueryBuilder(String databaseName) {
- this.queryParts = new StringJoiner(" ");
- this.databaseName = databaseName;
- }
-
- public static QueryBuilder create(String databaseName) {
- return new QueryBuilder(databaseName);
- }
-
- public Query build(List<QueryElement<?>> queryElements, Boolean onlyCountResults) {
- for (QueryElement<?> queryPart : queryElements) {
- this.queryParts.add(queryPart.getStatement());
- }
- if (onlyCountResults) {
- return toCountResultsQuery();
- } else {
- return toQuery();
- }
- }
-
- public Query toQuery() {
- return new Query(this.queryParts.toString(), this.databaseName);
- }
-
- public Query toCountResultsQuery() {
- String q = "SELECT COUNT(*) FROM (" + this.queryParts.toString() + ")";
- return new Query(q, this.databaseName);
- }
-}
-
-
-
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
deleted file mode 100644
index fc46398e8..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/DeleteFromStatement.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class DeleteFromStatement extends QueryElement<DeleteFromStatementParams> {
- public DeleteFromStatement(DeleteFromStatementParams deleteFromStatementParams) {
- super(deleteFromStatementParams);
- }
-
- @Override
- protected String buildStatement(DeleteFromStatementParams deleteFromStatementParams) {
- return QueryTemplatesV4.deleteFrom(deleteFromStatementParams.getIndex());
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
deleted file mode 100644
index 537119136..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTags.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class GroupingByTags extends QueryElement<GroupingByTagsParams> {
-
- public GroupingByTags(GroupingByTagsParams groupingByTagsParams) {
- super(groupingByTagsParams);
- }
-
- @Override
- protected String buildStatement(GroupingByTagsParams groupingByTagsParams) {
- String tags = "";
- for (String tag : groupingByTagsParams.getGroupingTags()) {
- if (tags.equals("")) {
- tags = tag;
- } else {
- tags = tags + ", " + tag;
- }
- }
-
- return QueryTemplatesV4.groupByTags(tags);
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
deleted file mode 100644
index 3ceadb161..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/GroupingByTime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class GroupingByTime extends QueryElement<GroupingByTimeParams> {
-
- public GroupingByTime(GroupingByTimeParams groupingByTimeParams) {
- super(groupingByTimeParams);
- }
-
- @Override
- protected String buildStatement(GroupingByTimeParams groupingByTimeParams) {
- return QueryTemplatesV4.groupByTime(groupingByTimeParams.getTimeInterval());
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
deleted file mode 100644
index 3158552b7..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/ItemLimitation.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class ItemLimitation extends QueryElement<ItemLimitationParams> {
-
- public ItemLimitation(ItemLimitationParams itemLimitationParams) {
- super(itemLimitationParams);
- }
-
- @Override
- protected String buildStatement(ItemLimitationParams itemLimitationParams) {
- return QueryTemplatesV4.limitItems(itemLimitationParams.getLimit());
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
deleted file mode 100644
index f931f2f72..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/Offset.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class Offset extends QueryElement<OffsetParams> {
-
- public Offset(OffsetParams offsetParams) {
- super(offsetParams);
- }
-
- @Override
- protected String buildStatement(OffsetParams offsetParams) {
- return QueryTemplatesV4.offset(offsetParams.getOffset());
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
deleted file mode 100644
index 58b55645b..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/OrderingByTime.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class OrderingByTime extends QueryElement<OrderingByTimeParams> {
-
- public OrderingByTime(OrderingByTimeParams orderingByTimeParams) {
- super(orderingByTimeParams);
- }
-
- @Override
- protected String buildStatement(OrderingByTimeParams orderingByTimeParams) {
- return QueryTemplatesV4.orderByTime(orderingByTimeParams.getOrdering());
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
deleted file mode 100644
index 09e93ffbb..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/SelectFromStatement.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
-
-import java.util.StringJoiner;
-
-public class SelectFromStatement extends QueryElement<SelectFromStatementParams> {
-
- public SelectFromStatement(SelectFromStatementParams selectFromStatementParams) {
- super(selectFromStatementParams);
- }
-
- @Override
- protected String buildStatement(SelectFromStatementParams params) {
- if (params.isSelectWildcard()) {
- return "SELECT * FROM " + escapeIndex(params.getIndex());
- } else {
- StringJoiner joiner = new StringJoiner(",");
- String queryPrefix = "SELECT ";
- String queryAppendix = " FROM " + escapeIndex(params.getIndex());
-
- params.getSelectedColumns().forEach(column -> {
- joiner.add(column.toQueryString());
- });
-
- return queryPrefix + joiner + queryAppendix;
- }
- }
-
- private String escapeIndex(String index) {
- return "\"" + index + "\"";
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
deleted file mode 100644
index 3bc23c221..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/TimeBoundary.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
-import org.apache.streampipes.dataexplorer.v4.template.QueryTemplatesV4;
-
-public class TimeBoundary extends QueryElement<TimeBoundaryParams> {
-
- public TimeBoundary(TimeBoundaryParams timeBoundaryParams) {
- super(timeBoundaryParams);
- }
-
- @Override
- protected String buildStatement(TimeBoundaryParams timeBoundaryParams) {
- if (timeBoundaryParams.getStartDate() == null) {
- return QueryTemplatesV4.whereTimeRightBound(timeBoundaryParams.getEndDate());
- } else if (timeBoundaryParams.getEndDate() == null) {
- return QueryTemplatesV4.whereTimeLeftBound(timeBoundaryParams.getStartDate());
- } else {
- return QueryTemplatesV4.whereTimeWithin(timeBoundaryParams.getStartDate(), timeBoundaryParams.getEndDate());
- }
- }
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/WhereStatement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/WhereStatement.java
deleted file mode 100644
index 88719d4b3..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/query/elements/WhereStatement.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.v4.query.elements;
-
-import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
-
-import java.util.StringJoiner;
-
-public class WhereStatement extends QueryElement<WhereStatementParams> {
-
- public WhereStatement(WhereStatementParams params) {
- super(params);
- }
-
- @Override
- protected String buildStatement(WhereStatementParams params) {
- StringJoiner joiner = new StringJoiner(" AND ");
-
- params.getWhereConditions().forEach(condition -> joiner.add(condition.toString()));
-
- return "WHERE " + joiner;
- }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
deleted file mode 100644
index 5ada24d77..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/template/QueryTemplatesV4.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.v4.template;
-
-public class QueryTemplatesV4 {
-
- public static String deleteFrom(String index) {
- return "DELETE FROM \"" + index + "\"";
- }
-
- public static String whereTimeWithin(long startDate, long endDate) {
- return "WHERE time > "
- + startDate * 1000000
- + " AND time < "
- + endDate * 1000000;
- }
-
- public static String whereTimeLeftBound(long startDate) {
- return "WHERE time > "
- + startDate * 1000000;
- }
-
- public static String whereTimeRightBound(long endDate) {
- return "WHERE time < "
- + endDate * 1000000;
- }
-
- public static String groupByTags(String tags) {
- return "GROUP BY " + tags;
- }
-
- public static String groupByTime(String timeInterval) {
- return "GROUP BY time(" + timeInterval + ")";
- }
-
- public static String orderByTime(String ordering) {
- return "ORDER BY time " + ordering.toUpperCase();
- }
-
- public static String limitItems(int limit) {
- return "LIMIT " + limit;
- }
-
- public static String offset(int offset) {
- return "OFFSET " + offset;
- }
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
deleted file mode 100644
index 850b14c62..000000000
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/utils/DataLakeManagementUtils.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.dataexplorer.v4.utils;
-
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.params.DeleteFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.FillParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTagsParams;
-import org.apache.streampipes.dataexplorer.v4.params.GroupingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.ItemLimitationParams;
-import org.apache.streampipes.dataexplorer.v4.params.OffsetParams;
-import org.apache.streampipes.dataexplorer.v4.params.OrderingByTimeParams;
-import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
-import org.apache.streampipes.dataexplorer.v4.params.SelectFromStatementParams;
-import org.apache.streampipes.dataexplorer.v4.params.TimeBoundaryParams;
-import org.apache.streampipes.dataexplorer.v4.params.WhereStatementParams;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_END_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FILTER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_GROUP_BY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_OFFSET;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_START_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
-
-
-public class DataLakeManagementUtils {
-
- public static final String BRACKET_OPEN = "\\[";
- public static final String BRACKET_CLOSE = "\\]";
-
- public static final String SELECT_FROM = "SELECT";
- public static final String WHERE = "WHERE";
- public static final String GROUP_BY_TAGS = "GROUPBY";
- public static final String GROUP_BY_TIME = "GROUPBYTIME";
- public static final String ORDER_DESCENDING = "DESC";
- public static final String LIMIT = "LIMIT";
- public static final String OFFSET = "OFFSET";
- public static final String FILL = "FILL";
- public static final String MAXIMUM_AMOUNT_OF_EVENTS = "MAXIMUM_AMOUNT_OF_EVENTS";
-
- public static final String DELETE_FROM = "DELETE";
-
- public static Map<String, QueryParamsV4> getSelectQueryParams(ProvidedQueryParams params) {
- Map<String, QueryParamsV4> queryParts = new HashMap<>();
- String measurementId = params.getMeasurementId();
-
- if (params.has(QP_COUNT_ONLY) && params.getAsBoolean(QP_COUNT_ONLY)) {
- queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS), true));
- } else {
- queryParts.put(SELECT_FROM, SelectFromStatementParams.from(measurementId, params.getAsString(QP_COLUMNS),
- params.getAsString(QP_AGGREGATION_FUNCTION)));
- }
-
- String filterConditions = params.getAsString(QP_FILTER);
-
- if (hasTimeParams(params)) {
- queryParts.put(WHERE, WhereStatementParams.from(measurementId,
- params.getAsLong(QP_START_DATE),
- params.getAsLong(QP_END_DATE),
- filterConditions));
- } else if (filterConditions != null) {
- queryParts.put(WHERE, WhereStatementParams.from(measurementId, filterConditions));
- }
-
- if (params.has(QP_TIME_INTERVAL)) {
- String timeInterval = params.getAsString(QP_TIME_INTERVAL);
- if (!params.has(QP_GROUP_BY)) {
- queryParts.put(GROUP_BY_TIME, GroupingByTimeParams.from(measurementId, timeInterval));
- } else {
- params.update(QP_GROUP_BY, params.getAsString(QP_GROUP_BY) + ",time(" + timeInterval + ")");
- }
-
- queryParts.put(FILL, FillParams.from(measurementId));
- }
-
- if (params.has(QP_GROUP_BY)) {
- queryParts.put(GROUP_BY_TAGS, GroupingByTagsParams.from(measurementId, params.getAsString(QP_GROUP_BY)));
- }
-
-
- if (params.has(QP_ORDER)) {
- String order = params.getAsString(QP_ORDER);
- if (order.equals(ORDER_DESCENDING)) {
- queryParts.put(ORDER_DESCENDING, OrderingByTimeParams.from(measurementId, order));
- }
- }
-
- if (params.has(QP_LIMIT)) {
- queryParts.put(LIMIT, ItemLimitationParams.from(measurementId, params.getAsInt(QP_LIMIT)));
- }
-
- if (params.has(QP_OFFSET)) {
- queryParts.put(OFFSET, OffsetParams.from(measurementId, params.getAsInt(QP_OFFSET)));
- } else if (params.has(QP_LIMIT) && params.has(QP_PAGE)) {
- queryParts.put(OFFSET, OffsetParams.from(measurementId,
- params.getAsInt(QP_PAGE) * params.getAsInt(QP_LIMIT)));
- }
-
- return queryParts;
- }
-
- public static Map<String, QueryParamsV4> getDeleteQueryParams(String measurementID,
- Long startDate,
- Long endDate) {
- Map<String, QueryParamsV4> queryParts = new HashMap<>();
- queryParts.put(DELETE_FROM, DeleteFromStatementParams.from(measurementID));
- if (startDate != null || endDate != null) {
- queryParts.put(WHERE, TimeBoundaryParams.from(measurementID, startDate, endDate));
- }
- return queryParts;
- }
-
- private static boolean hasTimeParams(ProvidedQueryParams params) {
- return params.has(QP_START_DATE)
- || params.has(QP_END_DATE);
- }
-
- public static List<String[]> buildConditions(String queryPart) {
- String[] conditions = queryPart.split(",");
- List<String[]> result = new ArrayList<>();
-
- Arrays.stream(conditions).forEach(condition -> {
- String[] singleCondition = buildSingleCondition(condition);
- result.add(singleCondition);
- });
- return result;
- }
-
- public static String[] buildSingleCondition(String queryPart) {
- return queryPart
- .replaceAll(BRACKET_OPEN, "")
- .replaceAll(BRACKET_CLOSE, "")
- .split(";");
- }
-}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/SelectQueryParamsTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/SelectQueryParamsTest.java
new file mode 100644
index 000000000..0ad559770
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/SelectQueryParamsTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.influx.DataLakeInfluxQueryBuilder;
+import org.apache.streampipes.dataexplorer.utils.ProvidedQueryParameterBuilder;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class SelectQueryParamsTest {
+
+ @Test
+ public void testWildcardTimeBoundQuery() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT * FROM \"abc\" WHERE (time < 2000000 AND time > 1000000);", query);
+ }
+
+ @Test
+ public void testSimpleColumnQuery() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withSimpleColumns(Arrays.asList("p1", "p2"))
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000);", query);
+ }
+
+ @Test
+ public void testSimpleColumnQueryWithBooleanFilter() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withSimpleColumns(Arrays.asList("p1", "p2"))
+ .withFilter("[p1;=;true]")
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 = true);", query);
+ }
+
+ @Test
+ public void testSimpleColumnQueryWithStringFilter() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withSimpleColumns(Arrays.asList("p1", "p2"))
+ .withFilter("[p1;=;def]")
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 = 'def');", query);
+ }
+
+ @Test
+ public void testSimpleColumnQueryWithIntFilter() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withSimpleColumns(Arrays.asList("p1", "p2"))
+ .withFilter("[p1;=;1]")
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 = 1.0);", query);
+ }
+
+ @Test
+ public void testSimpleColumnQueryWithFloatFilter() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withSimpleColumns(Arrays.asList("p1", "p2"))
+ .withFilter("[p1;>;1.0]")
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 > 1.0);", query);
+ }
+
+ @Test
+ public void testSimpleColumnQueryWithTwoFilters() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withSimpleColumns(Arrays.asList("p1", "p2"))
+ .withFilter("[p1;>;1.0],[p2;<;2]")
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT p1,p2 FROM \"abc\" WHERE (time < 2000000 AND time > 1000000 AND p1 > 1.0 AND"
+ + " p2 < 2.0);", query);
+ }
+
+ @Test
+ public void testAggregatedColumn() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withQueryColumns(List.of("[p1;MEAN;p1_mean]"))
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT MEAN(p1) AS p1_mean FROM \"abc\" WHERE (time < 2000000 AND time > 1000000);", query);
+ }
+
+ @Test
+ public void testAggregatedColumns() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withQueryColumns(Arrays.asList("[p1;MEAN;p1_mean]", "[p2;COUNT;p2_count]"))
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT MEAN(p1) AS p1_mean,COUNT(p2) AS p2_count FROM \"abc\" WHERE (time < 2000000 AND"
+ + " time > 1000000);", query);
+ }
+
+ @Test
+ public void testGroupByTag() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withQueryColumns(Arrays.asList("[p1;MEAN;p1_mean]", "[p2;COUNT;p2_count]"))
+ .withGroupBy(List.of("sensorId"))
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT MEAN(p1) AS p1_mean,COUNT(p2) AS p2_count FROM \"abc\" WHERE (time < 2000000 AND"
+ + " time > 1000000) GROUP BY sensorId;", query);
+ }
+
+ @Test
+ public void testGroupByTags() {
+ var params = ProvidedQueryParameterBuilder.create("abc")
+ .withStartDate(1)
+ .withEndDate(2)
+ .withQueryColumns(Arrays.asList("[p1;MEAN;p1_mean]", "[p2;COUNT;p2_count]"))
+ .withGroupBy(Arrays.asList("sensorId", "sensorId2"))
+ .build();
+
+ SelectQueryParams qp = ProvidedRestQueryParamConverter.getSelectQueryParams(params);
+
+ String query = qp.toQuery(DataLakeInfluxQueryBuilder.create("abc")).getCommand();
+
+ assertEquals("SELECT MEAN(p1) AS p1_mean,COUNT(p2) AS p2_count FROM \"abc\" WHERE (time < 2000000 AND"
+ + " time > 1000000) GROUP BY sensorId,sensorId2;", query);
+ }
+
+}
diff --git a/streampipes-rest/src/test/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParamsTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/WhereStatementParamsTest.java
similarity index 53%
rename from streampipes-rest/src/test/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParamsTest.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/WhereStatementParamsTest.java
index 694af5369..e24d90b87 100644
--- a/streampipes-rest/src/test/java/org/apache/streampipes/dataexplorer/v4/params/WhereStatementParamsTest.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/param/WhereStatementParamsTest.java
@@ -16,7 +16,10 @@
*
*/
-package org.apache.streampipes.dataexplorer.v4.params;
+package org.apache.streampipes.dataexplorer.param;
+
+import org.apache.streampipes.dataexplorer.param.model.WhereClauseParams;
+import org.apache.streampipes.dataexplorer.querybuilder.FilterCondition;
import org.junit.Test;
@@ -25,34 +28,34 @@ import static org.junit.Assert.assertEquals;
public class WhereStatementParamsTest {
@Test
public void filterNumber() {
- WhereStatementParams result = WhereStatementParams.from("", "[fieldName;=;6]");
- WhereCondition expected = new WhereCondition("fieldName", "=", "6");
+ WhereClauseParams result = WhereClauseParams.from("[fieldName;=;6]");
+ FilterCondition expected = new FilterCondition("fieldName", "=", 6.0);
assertWhereCondition(result, expected);
}
@Test
public void filterBoolean() {
- WhereStatementParams result = WhereStatementParams.from("", "[fieldName;=;true]");
- WhereCondition expected = new WhereCondition("fieldName", "=", "true");
+ WhereClauseParams result = WhereClauseParams.from("[fieldName;=;true]");
+ FilterCondition expected = new FilterCondition("fieldName", "=", true);
assertWhereCondition(result, expected);
}
@Test
public void filterString() {
- WhereStatementParams result = WhereStatementParams.from("", "[fieldName;=;a]");
- WhereCondition expected = new WhereCondition("fieldName", "=", "'a'");
+ WhereClauseParams result = WhereClauseParams.from("[fieldName;=;a]");
+ FilterCondition expected = new FilterCondition("fieldName", "=", "a");
assertWhereCondition(result, expected);
}
- private void assertWhereCondition(WhereStatementParams result, WhereCondition expected) {
+ private void assertWhereCondition(WhereClauseParams result, FilterCondition expected) {
assertEquals(1, result.getWhereConditions().size());
- WhereCondition resultingWhereCondition = result.getWhereConditions().get(0);
- assertEquals(expected.getField(), resultingWhereCondition.getField());
- assertEquals(expected.getOperator(), resultingWhereCondition.getOperator());
- assertEquals(expected.getCondition(), resultingWhereCondition.getCondition());
+ FilterCondition resultingFilterCondition = result.getWhereConditions().get(0);
+ assertEquals(expected.getField(), resultingFilterCondition.getField());
+ assertEquals(expected.getOperator(), resultingFilterCondition.getOperator());
+ assertEquals(expected.getCondition(), resultingFilterCondition.getCondition());
}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredCsvOutputWriter.java
similarity index 85%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredCsvOutputWriter.java
index c4679ffdf..5b64b4421 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredCsvOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredCsvOutputWriter.java
@@ -16,10 +16,9 @@
*
*/
-package org.apache.streampipesdataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredCsvOutputWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
import com.google.common.base.Charsets;
import org.junit.Test;
@@ -37,7 +36,7 @@ public class TestConfiguredCsvOutputWriter extends TestConfiguredOutputWriter {
@Test
public void testCsvOutputWriter() throws IOException {
var writer = new ConfiguredCsvOutputWriter();
- writer.configure(new ProvidedQueryParams(null, new HashMap<>()), true);
+ writer.configure(new ProvidedRestQueryParams(null, new HashMap<>()), true);
try (var outputStream = new ByteArrayOutputStream()) {
writer.beforeFirstItem(outputStream);
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredJsonOutputWriter.java
similarity index 85%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredJsonOutputWriter.java
index 9072c6b49..2e0431cf7 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredJsonOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredJsonOutputWriter.java
@@ -16,10 +16,9 @@
*
*/
-package org.apache.streampipesdataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.ConfiguredJsonOutputWriter;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
import com.google.common.base.Charsets;
import org.junit.Test;
@@ -38,7 +37,7 @@ public class TestConfiguredJsonOutputWriter extends TestConfiguredOutputWriter {
@Test
public void testJsonOutputWriter() throws IOException {
var writer = new ConfiguredJsonOutputWriter();
- writer.configure(new ProvidedQueryParams(null, new HashMap<>()), true);
+ writer.configure(new ProvidedRestQueryParams(null, new HashMap<>()), true);
try (var outputStream = new ByteArrayOutputStream()) {
writer.beforeFirstItem(outputStream);
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredOutputWriter.java
similarity index 95%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredOutputWriter.java
index 78927a1c9..8a681f9a8 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/TestConfiguredOutputWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/TestConfiguredOutputWriter.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipesdataexplorer.v4.query.writer;
+package org.apache.streampipes.dataexplorer.query.writer;
import org.junit.Before;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestCsvItemWriter.java
similarity index 91%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestCsvItemWriter.java
index 37a25a4d2..c05937d28 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestCsvItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestCsvItemWriter.java
@@ -16,9 +16,7 @@
*
*/
-package org.apache.streampipesdataexplorer.v4.query.writer.item;
-
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.CsvItemWriter;
+package org.apache.streampipes.dataexplorer.query.writer.item;
import org.junit.Test;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestItemWriter.java
similarity index 94%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestItemWriter.java
index 26092fd88..24b4d38bf 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestItemWriter.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipesdataexplorer.v4.query.writer.item;
+package org.apache.streampipes.dataexplorer.query.writer.item;
import org.junit.Before;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestJsonItemWriter.java
similarity index 89%
rename from streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
rename to streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestJsonItemWriter.java
index c00962835..833062e08 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipesdataexplorer/v4/query/writer/item/TestJsonItemWriter.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/query/writer/item/TestJsonItemWriter.java
@@ -16,9 +16,7 @@
*
*/
-package org.apache.streampipesdataexplorer.v4.query.writer.item;
-
-import org.apache.streampipes.dataexplorer.v4.query.writer.item.JsonItemWriter;
+package org.apache.streampipes.dataexplorer.query.writer.item;
import com.google.gson.Gson;
import org.junit.Test;
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java
index 82a999463..edce7e7f7 100644
--- a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/sdk/DataLakeQueryBuilderTest.java
@@ -18,6 +18,8 @@
package org.apache.streampipes.dataexplorer.sdk;
+import org.apache.streampipes.dataexplorer.influx.DataLakeInfluxQueryBuilder;
+
import org.junit.Test;
import java.util.List;
@@ -29,7 +31,7 @@ public class DataLakeQueryBuilderTest {
private static final String MEASUREMENT = "measurement";
@Test
public void withSimpleColumnsTest() {
- var result = DataLakeQueryBuilder
+ var result = DataLakeInfluxQueryBuilder
.create(MEASUREMENT)
.withSimpleColumns(List.of("one", "two"))
.build();
@@ -37,4 +39,4 @@ public class DataLakeQueryBuilderTest {
var expected = String.format("SELECT one,two FROM \"%s\";", MEASUREMENT);
assertEquals(expected , result.getCommand());
}
-}
\ No newline at end of file
+}
diff --git a/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/utils/ProvidedQueryParameterBuilder.java b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/utils/ProvidedQueryParameterBuilder.java
new file mode 100644
index 000000000..c4cd73ceb
--- /dev/null
+++ b/streampipes-data-explorer/src/test/java/org/apache/streampipes/dataexplorer/utils/ProvidedQueryParameterBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.utils;
+
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ProvidedQueryParameterBuilder {
+
+ private final String measurementId;
+ private final Map<String, String> queryParams = new HashMap<>();
+
+ public static ProvidedQueryParameterBuilder create(String measurementId) {
+ return new ProvidedQueryParameterBuilder(measurementId);
+ }
+
+ public ProvidedQueryParameterBuilder(String measurementId) {
+ this.measurementId = measurementId;
+ }
+
+ public ProvidedQueryParameterBuilder withStartDate(long startDate) {
+ this.queryParams.put(SupportedRestQueryParams.QP_START_DATE, String.valueOf(startDate));
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withEndDate(long endDate) {
+ this.queryParams.put(SupportedRestQueryParams.QP_END_DATE, String.valueOf(endDate));
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withSimpleColumns(List<String> simpleColumns) {
+ this.queryParams.put(SupportedRestQueryParams.QP_COLUMNS, String.join(",", simpleColumns));
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withQueryColumns(List<String> rawQueryColumns) {
+ this.queryParams.put(SupportedRestQueryParams.QP_COLUMNS, String.join(",", rawQueryColumns));
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withGroupBy(List<String> groupBy) {
+ this.queryParams.put(SupportedRestQueryParams.QP_GROUP_BY, String.join(",", groupBy));
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withFilter(String filter) {
+ this.queryParams.put(SupportedRestQueryParams.QP_FILTER, filter);
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withPage(int page) {
+ this.queryParams.put(SupportedRestQueryParams.QP_PAGE, String.valueOf(page));
+
+ return this;
+ }
+
+ public ProvidedQueryParameterBuilder withLimit(int limit) {
+ this.queryParams.put(SupportedRestQueryParams.QP_LIMIT, String.valueOf(limit));
+
+ return this;
+ }
+
+ public ProvidedRestQueryParams build() {
+ return new ProvidedRestQueryParams(measurementId, queryParams);
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index ae3a0c3ea..9b9641366 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -18,8 +18,6 @@
package org.apache.streampipes.sinks.internal.jvm;
-import org.apache.streampipes.dataexplorer.commons.configs.CouchDbConfigurations;
-import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -53,8 +51,6 @@ public class SinksInternalJvmInit extends ExtensionsModelSubmitter {
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
- .addConfigs(DataExplorerConfigurations.getDefaults())
- .addConfigs(CouchDbConfigurations.getDefaults())
.build();
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
deleted file mode 100644
index 590d30e27..000000000
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.ps;
-
-import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-
-import jakarta.ws.rs.Consumes;
-import jakarta.ws.rs.POST;
-import jakarta.ws.rs.Path;
-import jakarta.ws.rs.PathParam;
-import jakarta.ws.rs.Produces;
-import jakarta.ws.rs.core.MediaType;
-import jakarta.ws.rs.core.Response;
-
-@Path("/v3/datalake/measure")
-@Deprecated
-public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
-
- private DataLakeNoUserManagementV3 dataLakeManagement;
-
- public DataLakeMeasureResourceV3() {
- this.dataLakeManagement = new DataLakeNoUserManagementV3();
- }
-
- @POST
- @JacksonSerialized
- @Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- @Path("/{measure}")
- public Response addDataLake(@PathParam("measure") String measure, EventSchema eventSchema) {
- if (this.dataLakeManagement.addDataLake(measure, eventSchema)) {
- return ok();
- } else {
- return Response.status(409).build();
- }
-
- }
-}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index a59ce31ac..4e57132ff 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.ps;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -37,10 +38,10 @@ import jakarta.ws.rs.core.Response;
@Path("/v4/datalake/measure")
public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
- private DataLakeManagementV4 dataLakeManagement;
+ private final IDataExplorerSchemaManagement dataLakeMeasureManagement;
public DataLakeMeasureResourceV4() {
- this.dataLakeManagement = new DataLakeManagementV4();
+ this.dataLakeMeasureManagement = new DataExplorerSchemaManagement();
}
@POST
@@ -48,7 +49,7 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
- DataLakeMeasure result = this.dataLakeManagement.addDataLake(dataLakeMeasure);
+ DataLakeMeasure result = this.dataLakeMeasureManagement.createMeasurement(dataLakeMeasure);
return ok(result);
}
@@ -56,8 +57,8 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}")
- public Response getDataLakeMeasure(@PathParam("id") String measureId) {
- return ok(this.dataLakeManagement.getById(measureId));
+ public Response getDataLakeMeasure(@PathParam("id") String elementId) {
+ return ok(this.dataLakeMeasureManagement.getById(elementId));
}
@PUT
@@ -65,11 +66,11 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("{id}")
- public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+ public Response updateDataLakeMeasure(@PathParam("id") String elementId,
DataLakeMeasure measure) {
- if (measureId.equals(measure.getElementId())) {
+ if (elementId.equals(measure.getElementId())) {
try {
- this.dataLakeManagement.updateDataLake(measure);
+ this.dataLakeMeasureManagement.updateMeasurement(measure);
return ok();
} catch (IllegalArgumentException e) {
return badRequest(e.getMessage());
@@ -81,9 +82,9 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@DELETE
@JacksonSerialized
@Path("{id}")
- public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+ public Response deleteDataLakeMeasure(@PathParam("id") String elementId) {
try {
- this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+ this.dataLakeMeasureManagement.deleteMeasurement(elementId);
return ok();
} catch (IllegalArgumentException e) {
return badRequest(e.getMessage());
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 4ed1a0bd7..b449e2dcf 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -18,9 +18,10 @@
package org.apache.streampipes.ps;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
-import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
-import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.param.ProvidedRestQueryParams;
+import org.apache.streampipes.dataexplorer.query.writer.OutputFormat;
import org.apache.streampipes.model.StreamPipesErrorMessage;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataSeries;
@@ -57,38 +58,41 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AGGREGATION_FUNCTION;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_AUTO_AGGREGATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COLUMNS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_COUNT_ONLY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_CSV_DELIMITER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_END_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FILTER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_FORMAT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_GROUP_BY;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_LIMIT;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MAXIMUM_AMOUNT_OF_EVENTS;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_MISSING_VALUE_BEHAVIOUR;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_OFFSET;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_ORDER;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_PAGE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_START_DATE;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.QP_TIME_INTERVAL;
-import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.SUPPORTED_PARAMS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AGGREGATION_FUNCTION;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_AUTO_AGGREGATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COLUMNS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_COUNT_ONLY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_CSV_DELIMITER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_END_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_FILTER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_FORMAT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_GROUP_BY;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_LIMIT;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_MAXIMUM_AMOUNT_OF_EVENTS;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_MISSING_VALUE_BEHAVIOUR;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_OFFSET;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_ORDER;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_PAGE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_START_DATE;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.QP_TIME_INTERVAL;
+import static org.apache.streampipes.dataexplorer.param.SupportedRestQueryParams.SUPPORTED_PARAMS;
@Path("v4/datalake")
public class DataLakeResourceV4 extends AbstractRestResource {
private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class);
- private DataLakeManagementV4 dataLakeManagement;
+ private DataExplorerQueryManagement dataLakeManagement;
+ private final DataExplorerSchemaManagement dataExplorerSchemaManagement;
public DataLakeResourceV4() {
- this.dataLakeManagement = new DataLakeManagementV4();
+ this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
+ this.dataLakeManagement = new DataExplorerQueryManagement(dataExplorerSchemaManagement);
}
- public DataLakeResourceV4(DataLakeManagementV4 dataLakeManagement) {
+ public DataLakeResourceV4(DataExplorerQueryManagement dataLakeManagement) {
this.dataLakeManagement = dataLakeManagement;
+ this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
}
@@ -127,10 +131,10 @@ public class DataLakeResourceV4 extends AbstractRestResource {
@Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true)
@PathParam("measurementID") String measurementID) {
- boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+ boolean isSuccessDataLake = this.dataLakeManagement.deleteData(measurementID);
if (isSuccessDataLake) {
- boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+ boolean isSuccessEventProperty = this.dataExplorerSchemaManagement.deleteMeasurementByName(measurementID);
if (isSuccessEventProperty) {
return ok();
} else {
@@ -152,7 +156,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
description = "array of stored measurement series",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
public Response getAll() {
- List<DataLakeMeasure> allMeasurements = this.dataLakeManagement.getAllMeasurements();
+ List<DataLakeMeasure> allMeasurements = this.dataExplorerSchemaManagement.getAllMeasurements();
return ok(allMeasurements);
}
@@ -231,7 +235,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
if (!(checkProvidedQueryParams(queryParams))) {
return badRequest();
} else {
- ProvidedQueryParams sanitizedParams = populate(measurementID, queryParams);
+ ProvidedRestQueryParams sanitizedParams = populate(measurementID, queryParams);
try {
SpQueryResult result =
this.dataLakeManagement.getData(sanitizedParams, isIgnoreMissingValues(missingValueBehaviour));
@@ -249,7 +253,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
public Response getData(List<Map<String, String>> queryParams) {
var results = queryParams
.stream()
- .map(qp -> new ProvidedQueryParams(qp.get("measureName"), qp))
+ .map(qp -> new ProvidedRestQueryParams(qp.get("measureName"), qp))
.map(params -> this.dataLakeManagement.getData(params, true))
.collect(Collectors.toList());
@@ -317,7 +321,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
if (!(checkProvidedQueryParams(queryParams))) {
return badRequest();
} else {
- ProvidedQueryParams sanitizedParams = populate(measurementID, queryParams);
+ ProvidedRestQueryParams sanitizedParams = populate(measurementID, queryParams);
if (format == null) {
format = "csv";
}
@@ -341,7 +345,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
responses = {
@ApiResponse(responseCode = "200", description = "All measurement series successfully removed")})
public Response removeAll() {
- boolean isSuccess = this.dataLakeManagement.removeAllMeasurements();
+ boolean isSuccess = this.dataLakeManagement.deleteAllData();
return Response.ok(isSuccess).build();
}
@@ -349,11 +353,11 @@ public class DataLakeResourceV4 extends AbstractRestResource {
return SUPPORTED_PARAMS.containsAll(providedParams.keySet());
}
- private ProvidedQueryParams populate(String measurementId, MultivaluedMap<String, String> rawParams) {
+ private ProvidedRestQueryParams populate(String measurementId, MultivaluedMap<String, String> rawParams) {
Map<String, String> queryParamMap = new HashMap<>();
rawParams.forEach((key, value) -> queryParamMap.put(key, String.join(",", value)));
- return new ProvidedQueryParams(measurementId, queryParamMap);
+ return new ProvidedRestQueryParams(measurementId, queryParamMap);
}
// Checks if the parameter for missing value behaviour is set
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index fc7d07e5d..0a816ebcb 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,9 @@
package org.apache.streampipes.rest;
import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.manager.file.FileManager;
import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
@@ -93,13 +95,15 @@ public class ResetManagement {
});
// Remove all data in data lake
- DataLakeManagementV4 dataLakeManagementV4 = new DataLakeManagementV4();
- List<DataLakeMeasure> allMeasurements = dataLakeManagementV4.getAllMeasurements();
+ IDataExplorerSchemaManagement dataLakeMeasureManagement = new DataExplorerSchemaManagement();
+ DataExplorerQueryManagement dataExplorerQueryManagement =
+ new DataExplorerQueryManagement(dataLakeMeasureManagement);
+ List<DataLakeMeasure> allMeasurements = dataLakeMeasureManagement.getAllMeasurements();
allMeasurements.forEach(measurement -> {
- boolean isSuccessDataLake = dataLakeManagementV4.removeMeasurement(measurement.getMeasureName());
+ boolean isSuccessDataLake = dataExplorerQueryManagement.deleteData(measurement.getMeasureName());
if (isSuccessDataLake) {
- dataLakeManagementV4.removeEventProperty(measurement.getMeasureName());
+ dataLakeMeasureManagement.deleteMeasurementByName(measurement.getMeasureName());
}
});
diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
index 1a4f253b7..033458721 100644
--- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
+++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.service.core;
import org.apache.streampipes.ps.DataLakeImageResource;
-import org.apache.streampipes.ps.DataLakeMeasureResourceV3;
import org.apache.streampipes.ps.DataLakeMeasureResourceV4;
import org.apache.streampipes.ps.DataLakeResourceV3;
import org.apache.streampipes.ps.DataLakeResourceV4;
@@ -120,7 +119,6 @@ public class StreamPipesResourceConfig extends BaseResourceConfig {
DataLakeWidgetResource.class,
DataLakeImageResource.class,
DataLakeResourceV3.class,
- DataLakeMeasureResourceV3.class,
DataLakeMeasureResourceV4.class,
DataStream.class,
EmailConfigurationResource.class,