You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2023/03/10 20:55:01 UTC
[streampipes] branch 1406-cleanup-data-explorer-query-management updated: Improve data explorer query management (#1406)
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch 1406-cleanup-data-explorer-query-management
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to refs/heads/1406-cleanup-data-explorer-query-management by this push:
new f0cb55986 Improve data explorer query management (#1406)
f0cb55986 is described below
commit f0cb55986c70ab74083862824bb07f9c21fefe92
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Fri Mar 10 21:54:46 2023 +0100
Improve data explorer query management (#1406)
---
.../commons/configs/CouchDbConfigurations.java | 35 ------
.../configs/DataExplorerConfigurations.java | 46 -------
.../commons/configs/DataExplorerEnvKeys.java | 28 -----
...entV4.java => DataExplorerQueryManagement.java} | 137 +++------------------
...ntV3.java => DataExplorerSchemaManagement.java} | 93 +++++++++++---
.../api/IDataExplorerQueryManagement.java | 47 +++++++
.../api/IDataExplorerSchemaManagement.java | 23 +++-
.../dataexplorer/v4/AutoAggregationHandler.java | 16 ++-
.../sinks/internal/jvm/SinksInternalJvmInit.java | 4 -
.../streampipes/ps/DataLakeMeasureResourceV3.java | 57 ---------
.../streampipes/ps/DataLakeMeasureResourceV4.java | 23 ++--
.../apache/streampipes/ps/DataLakeResourceV4.java | 20 +--
.../apache/streampipes/rest/ResetManagement.java | 13 +-
.../service/core/StreamPipesResourceConfig.java | 2 -
14 files changed, 201 insertions(+), 343 deletions(-)
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
deleted file mode 100644
index 45678a9c1..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-public class CouchDbConfigurations {
-
- public static List<ConfigItem> getDefaults() {
- return Arrays.asList(
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
- );
- }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
deleted file mode 100644
index feb7135ea..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-
-import java.util.Arrays;
-import java.util.List;
-
-
-public class DataExplorerConfigurations {
- public static final String DATA_LAKE_DATABASE_NAME = "sp";
-
- public static List<ConfigItem> getDefaults() {
-
- return Arrays.asList(
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb",
- "Hostname for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http",
- "Protocol for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default",
- "Username for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default",
- "Password for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME,
- "Database name for the StreamPipes data lake database")
- );
- }
-
-}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
deleted file mode 100644
index 6eb840708..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.configs;
-
-public class DataExplorerEnvKeys {
- public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
- public static final String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
- public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
- public static final String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
- public static final String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
- public static final String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
-
-}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
similarity index 51%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
index 7f397af7a..c2fd61edd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerQueryManagement.java
@@ -20,9 +20,10 @@ package org.apache.streampipes.dataexplorer;
import org.apache.streampipes.commons.environment.Environment;
import org.apache.streampipes.commons.environment.Environments;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxClientProvider;
import org.apache.streampipes.dataexplorer.query.DeleteDataQuery;
-import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.params.QueryParamsV4;
import org.apache.streampipes.dataexplorer.v4.query.DataExplorerQueryV4;
@@ -32,45 +33,34 @@ import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
import org.apache.streampipes.dataexplorer.v4.utils.DataLakeManagementUtils;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.SpQueryResult;
-import org.apache.streampipes.model.schema.EventProperty;
-import org.apache.streampipes.model.schema.EventPropertyList;
-import org.apache.streampipes.model.schema.EventPropertyNested;
-import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.storage.api.IDataLakeStorage;
-import org.apache.streampipes.storage.couchdb.utils.Utils;
-import org.apache.streampipes.storage.management.StorageDispatcher;
-
-import com.google.gson.JsonObject;
+
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
-import org.lightcouch.CouchDbClient;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.stream.Collectors;
-public class DataLakeManagementV4 {
+public class DataExplorerQueryManagement implements IDataExplorerQueryManagement {
- public List<DataLakeMeasure> getAllMeasurements() {
- return DataExplorerUtils.getInfos();
- }
+ private final IDataExplorerSchemaManagement dataExplorerSchemaManagement;
- public DataLakeMeasure getById(String measureId) {
- return getDataLakeStorage().findOne(measureId);
+ public DataExplorerQueryManagement(IDataExplorerSchemaManagement dataExplorerSchemaManagement) {
+ this.dataExplorerSchemaManagement = dataExplorerSchemaManagement;
}
+ @Override
public SpQueryResult getData(ProvidedQueryParams queryParams,
boolean ignoreMissingData) throws IllegalArgumentException {
return new QueryResultProvider(queryParams, ignoreMissingData).getData();
}
+ @Override
public void getDataAsStream(ProvidedQueryParams params,
OutputFormat format,
boolean ignoreMissingValues,
@@ -79,7 +69,8 @@ public class DataLakeManagementV4 {
new StreamedQueryResultProvider(params, format, ignoreMissingValues).getDataAsStream(outputStream);
}
- public boolean removeAllMeasurements() {
+ @Override
+ public boolean deleteAllData() {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
for (DataLakeMeasure measure : allMeasurements) {
@@ -91,7 +82,8 @@ public class DataLakeManagementV4 {
return true;
}
- public boolean removeMeasurement(String measurementID) {
+ @Override
+ public boolean deleteData(String measurementID) {
List<DataLakeMeasure> allMeasurements = getAllMeasurements();
for (DataLakeMeasure measure : allMeasurements) {
if (measure.getMeasureName().equals(measurementID)) {
@@ -103,34 +95,14 @@ public class DataLakeManagementV4 {
return false;
}
+ @Override
public SpQueryResult deleteData(String measurementID, Long startDate, Long endDate) {
Map<String, QueryParamsV4> queryParts =
DataLakeManagementUtils.getDeleteQueryParams(measurementID, startDate, endDate);
return new DataExplorerQueryV4(queryParts).executeQuery(true);
}
- public boolean removeEventProperty(String measurementID) {
- boolean isSuccess = false;
- CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
- List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
-
- for (JsonObject document : docs) {
- if (document.get("measureName").toString().replace("\"", "").equals(measurementID)) {
- couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
- document.get("_rev").toString().replace("\"", ""));
- isSuccess = true;
- break;
- }
- }
-
- try {
- couchDbClient.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return isSuccess;
- }
-
+ @Override
public Map<String, Object> getTagValues(String measurementId,
String fields) {
InfluxDB influxDB = InfluxClientProvider.getInfluxDBClient();
@@ -160,83 +132,8 @@ public class DataLakeManagementV4 {
return tags;
}
- public void updateDataLake(DataLakeMeasure measure) throws IllegalArgumentException {
- var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
- if (existingMeasure != null) {
- measure.setRev(existingMeasure.getRev());
- getDataLakeStorage().updateDataLakeMeasure(measure);
- } else {
- getDataLakeStorage().storeDataLakeMeasure(measure);
- }
- }
-
- public void deleteDataLakeMeasure(String elementId) throws IllegalArgumentException {
- if (getDataLakeStorage().findOne(elementId) != null) {
- getDataLakeStorage().deleteDataLakeMeasure(elementId);
- } else {
- throw new IllegalArgumentException("Could not find measure with this ID");
- }
- }
-
- public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
- List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
- Optional<DataLakeMeasure> optional =
- dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
- .findFirst();
-
- if (optional.isPresent()) {
- DataLakeMeasure oldEntry = optional.get();
- if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
- measure.getEventSchema().getEventProperties())) {
- return oldEntry;
- }
- } else {
- measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
- getDataLakeStorage().storeDataLakeMeasure(measure);
- return measure;
- }
-
- return measure;
- }
-
- private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
- if (prop1.size() != prop2.size()) {
- return false;
- }
-
- return prop1.stream().allMatch(prop -> {
-
- for (EventProperty property : prop2) {
- if (prop.getRuntimeName().equals(property.getRuntimeName())) {
-
- //primitive
- if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
- if (((EventPropertyPrimitive) prop)
- .getRuntimeType()
- .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
- return true;
- }
-
- //list
- } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
- return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
- Collections.singletonList(((EventPropertyList) property).getEventProperty()));
-
- //nested
- } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
- return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
- ((EventPropertyNested) property).getEventProperties());
- }
- }
- }
- return false;
-
- });
- }
-
-
- private IDataLakeStorage getDataLakeStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
+ private List<DataLakeMeasure> getAllMeasurements() {
+ return this.dataExplorerSchemaManagement.getAllMeasurements();
}
private Environment getEnvironment() {
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
similarity index 54%
rename from streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
index 6fd104ecd..f28cf6cbd 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java
@@ -18,41 +18,104 @@
package org.apache.streampipes.dataexplorer;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyList;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.storage.api.IDataLakeStorage;
+import org.apache.streampipes.storage.couchdb.utils.Utils;
import org.apache.streampipes.storage.management.StorageDispatcher;
+import com.google.gson.JsonObject;
+import org.lightcouch.CouchDbClient;
+
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
+public class DataExplorerSchemaManagement implements IDataExplorerSchemaManagement {
-@Deprecated
-public class DataLakeNoUserManagementV3 {
+ @Override
+ public List<DataLakeMeasure> getAllMeasurements() {
+ return DataExplorerUtils.getInfos();
+ }
+ @Override
+ public DataLakeMeasure getById(String elementId) {
+ return getDataLakeStorage().findOne(elementId);
+ }
- @Deprecated
- public boolean addDataLake(String measure, EventSchema eventSchema) {
+ @Override
+ public DataLakeMeasure createMeasurement(DataLakeMeasure measure) {
List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
Optional<DataLakeMeasure> optional =
- dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
+ dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName()))
+ .findFirst();
if (optional.isPresent()) {
- if (!compareEventProperties(optional.get().getEventSchema().getEventProperties(),
- eventSchema.getEventProperties())) {
- return false;
+ DataLakeMeasure oldEntry = optional.get();
+ if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(),
+ measure.getEventSchema().getEventProperties())) {
+ return oldEntry;
+ }
+ } else {
+ measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+ getDataLakeStorage().storeDataLakeMeasure(measure);
+ return measure;
+ }
+
+ return measure;
+ }
+
+ @Override
+ public void deleteMeasurement(String elementId) {
+ if (getDataLakeStorage().findOne(elementId) != null) {
+ getDataLakeStorage().deleteDataLakeMeasure(elementId);
+ } else {
+ throw new IllegalArgumentException("Could not find measure with this ID");
+ }
+ }
+
+ @Override
+ public boolean deleteMeasurementByName(String measureName) {
+ boolean isSuccess = false;
+ CouchDbClient couchDbClient = Utils.getCouchDbDataLakeClient();
+ List<JsonObject> docs = couchDbClient.view("_all_docs").includeDocs(true).query(JsonObject.class);
+
+ for (JsonObject document : docs) {
+ if (document.get("measureName").toString().replace("\"", "").equals(measureName)) {
+ couchDbClient.remove(document.get("_id").toString().replace("\"", ""),
+ document.get("_rev").toString().replace("\"", ""));
+ isSuccess = true;
+ break;
}
+ }
+
+ try {
+ couchDbClient.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return isSuccess;
+ }
+
+ @Override
+ public void updateMeasurement(DataLakeMeasure measure) {
+ var existingMeasure = getDataLakeStorage().findOne(measure.getElementId());
+ if (existingMeasure != null) {
+ measure.setRev(existingMeasure.getRev());
+ getDataLakeStorage().updateDataLakeMeasure(measure);
} else {
- DataLakeMeasure dataLakeMeasure = new DataLakeMeasure(measure, eventSchema);
- dataLakeMeasure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
- getDataLakeStorage().storeDataLakeMeasure(dataLakeMeasure);
+ getDataLakeStorage().storeDataLakeMeasure(measure);
}
- return true;
+ }
+
+ private IDataLakeStorage getDataLakeStorage() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
}
private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
@@ -89,8 +152,4 @@ public class DataLakeNoUserManagementV3 {
});
}
-
- private IDataLakeStorage getDataLakeStorage() {
- return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
- }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
new file mode 100644
index 000000000..0b895cefa
--- /dev/null
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerQueryManagement.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.api;
+
+import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
+import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
+import org.apache.streampipes.model.datalake.SpQueryResult;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+public interface IDataExplorerQueryManagement {
+
+ SpQueryResult getData(ProvidedQueryParams queryParams,
+ boolean ignoreMissingData) throws IllegalArgumentException;
+
+ void getDataAsStream(ProvidedQueryParams params,
+ OutputFormat format,
+ boolean ignoreMissingValues,
+ OutputStream outputStream) throws IOException;
+
+ boolean deleteData(String measurementID);
+
+ SpQueryResult deleteData(String measurementID, Long startDate, Long endDate);
+
+ boolean deleteAllData();
+
+ Map<String, Object> getTagValues(String measurementId,
+ String fields);
+}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
similarity index 62%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
rename to streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
index 23867c634..d8dc29648 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java
@@ -16,10 +16,23 @@
*
*/
-package org.apache.streampipes.dataexplorer.commons.configs;
+package org.apache.streampipes.dataexplorer.api;
-public class CouchDbEnvKeys {
- public static final String COUCHDB_HOST = "SP_COUCHDB_HOST";
- public static final String COUCHDB_PORT = "SP_COUCHDB_PORT";
- public static final String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+
+import java.util.List;
+
+public interface IDataExplorerSchemaManagement {
+
+ List<DataLakeMeasure> getAllMeasurements();
+
+ DataLakeMeasure getById(String elementId);
+
+ DataLakeMeasure createMeasurement(DataLakeMeasure measure);
+
+ void deleteMeasurement(String elementId);
+
+ boolean deleteMeasurementByName(String measureName);
+
+ void updateMeasurement(DataLakeMeasure measure);
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
index a63c98077..5085820c0 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/v4/AutoAggregationHandler.java
@@ -17,7 +17,9 @@
*/
package org.apache.streampipes.dataexplorer.v4;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
import org.apache.streampipes.dataexplorer.sdk.DataLakeQueryOrdering;
import org.apache.streampipes.dataexplorer.v4.params.SelectColumn;
import org.apache.streampipes.model.datalake.SpQueryResult;
@@ -50,12 +52,16 @@ public class AutoAggregationHandler {
private final SimpleDateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
private final SimpleDateFormat dateFormat2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
- private final DataLakeManagementV4 dataLakeManagement;
+ private final IDataExplorerQueryManagement dataLakeQueryManagement;
private final ProvidedQueryParams queryParams;
public AutoAggregationHandler(ProvidedQueryParams params) {
this.queryParams = params;
- this.dataLakeManagement = new DataLakeManagementV4();
+ this.dataLakeQueryManagement = getDataLakeQueryManagement();
+ }
+
+ private IDataExplorerQueryManagement getDataLakeQueryManagement() {
+ return new DataExplorerQueryManagement(new DataExplorerSchemaManagement());
}
public ProvidedQueryParams makeAutoAggregationQueryParams() throws IllegalArgumentException {
@@ -97,13 +103,13 @@ public class AutoAggregationHandler {
countParams.update(QP_COUNT_ONLY, true);
countParams.update(QP_COLUMNS, fieldName);
- SpQueryResult result = new DataLakeManagementV4().getData(countParams, true);
+ SpQueryResult result = dataLakeQueryManagement.getData(countParams, true);
return result.getTotal() > 0 ? ((Double) result.getAllDataSeries().get(0).getRows().get(0).get(1)).intValue() : 0;
}
private SpQueryResult fireQuery(ProvidedQueryParams params) {
- return dataLakeManagement.getData(params, true);
+ return dataLakeQueryManagement.getData(params, true);
}
private int getAggregationValue(SpQueryResult newest, SpQueryResult oldest) throws ParseException {
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index ae3a0c3ea..9b9641366 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -18,8 +18,6 @@
package org.apache.streampipes.sinks.internal.jvm;
-import org.apache.streampipes.dataexplorer.commons.configs.CouchDbConfigurations;
-import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -53,8 +51,6 @@ public class SinksInternalJvmInit extends ExtensionsModelSubmitter {
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
- .addConfigs(DataExplorerConfigurations.getDefaults())
- .addConfigs(CouchDbConfigurations.getDefaults())
.build();
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
deleted file mode 100644
index 590d30e27..000000000
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.ps;
-
-import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
-import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
-
-import jakarta.ws.rs.Consumes;
-import jakarta.ws.rs.POST;
-import jakarta.ws.rs.Path;
-import jakarta.ws.rs.PathParam;
-import jakarta.ws.rs.Produces;
-import jakarta.ws.rs.core.MediaType;
-import jakarta.ws.rs.core.Response;
-
-@Path("/v3/datalake/measure")
-@Deprecated
-public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
-
- private DataLakeNoUserManagementV3 dataLakeManagement;
-
- public DataLakeMeasureResourceV3() {
- this.dataLakeManagement = new DataLakeNoUserManagementV3();
- }
-
- @POST
- @JacksonSerialized
- @Produces(MediaType.APPLICATION_JSON)
- @Consumes(MediaType.APPLICATION_JSON)
- @Path("/{measure}")
- public Response addDataLake(@PathParam("measure") String measure, EventSchema eventSchema) {
- if (this.dataLakeManagement.addDataLake(measure, eventSchema)) {
- return ok();
- } else {
- return Response.status(409).build();
- }
-
- }
-}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index a59ce31ac..4e57132ff 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.ps;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -37,10 +38,10 @@ import jakarta.ws.rs.core.Response;
@Path("/v4/datalake/measure")
public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
- private DataLakeManagementV4 dataLakeManagement;
+ private final IDataExplorerSchemaManagement dataLakeMeasureManagement;
public DataLakeMeasureResourceV4() {
- this.dataLakeManagement = new DataLakeManagementV4();
+ this.dataLakeMeasureManagement = new DataExplorerSchemaManagement();
}
@POST
@@ -48,7 +49,7 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
- DataLakeMeasure result = this.dataLakeManagement.addDataLake(dataLakeMeasure);
+ DataLakeMeasure result = this.dataLakeMeasureManagement.createMeasurement(dataLakeMeasure);
return ok(result);
}
@@ -56,8 +57,8 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
@Path("{id}")
- public Response getDataLakeMeasure(@PathParam("id") String measureId) {
- return ok(this.dataLakeManagement.getById(measureId));
+ public Response getDataLakeMeasure(@PathParam("id") String elementId) {
+ return ok(this.dataLakeMeasureManagement.getById(elementId));
}
@PUT
@@ -65,11 +66,11 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Path("{id}")
- public Response updateDataLakeMeasure(@PathParam("id") String measureId,
+ public Response updateDataLakeMeasure(@PathParam("id") String elementId,
DataLakeMeasure measure) {
- if (measureId.equals(measure.getElementId())) {
+ if (elementId.equals(measure.getElementId())) {
try {
- this.dataLakeManagement.updateDataLake(measure);
+ this.dataLakeMeasureManagement.updateMeasurement(measure);
return ok();
} catch (IllegalArgumentException e) {
return badRequest(e.getMessage());
@@ -81,9 +82,9 @@ public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
@DELETE
@JacksonSerialized
@Path("{id}")
- public Response deleteDataLakeMeasure(@PathParam("id") String measureId) {
+ public Response deleteDataLakeMeasure(@PathParam("id") String elementId) {
try {
- this.dataLakeManagement.deleteDataLakeMeasure(measureId);
+ this.dataLakeMeasureManagement.deleteMeasurement(elementId);
return ok();
} catch (IllegalArgumentException e) {
return badRequest(e.getMessage());
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
index 4ed1a0bd7..ef0971d89 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV4.java
@@ -18,7 +18,8 @@
package org.apache.streampipes.ps;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
import org.apache.streampipes.dataexplorer.v4.ProvidedQueryParams;
import org.apache.streampipes.dataexplorer.v4.query.writer.OutputFormat;
import org.apache.streampipes.model.StreamPipesErrorMessage;
@@ -81,14 +82,17 @@ public class DataLakeResourceV4 extends AbstractRestResource {
private static final Logger logger = LoggerFactory.getLogger(DataLakeResourceV4.class);
- private DataLakeManagementV4 dataLakeManagement;
+ private DataExplorerQueryManagement dataLakeManagement;
+ private final DataExplorerSchemaManagement dataExplorerSchemaManagement;
public DataLakeResourceV4() {
- this.dataLakeManagement = new DataLakeManagementV4();
+ this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
+ this.dataLakeManagement = new DataExplorerQueryManagement(dataExplorerSchemaManagement);
}
- public DataLakeResourceV4(DataLakeManagementV4 dataLakeManagement) {
+ public DataLakeResourceV4(DataExplorerQueryManagement dataLakeManagement) {
this.dataLakeManagement = dataLakeManagement;
+ this.dataExplorerSchemaManagement = new DataExplorerSchemaManagement();
}
@@ -127,10 +131,10 @@ public class DataLakeResourceV4 extends AbstractRestResource {
@Parameter(in = ParameterIn.PATH, description = "the id of the measurement series", required = true)
@PathParam("measurementID") String measurementID) {
- boolean isSuccessDataLake = this.dataLakeManagement.removeMeasurement(measurementID);
+ boolean isSuccessDataLake = this.dataLakeManagement.deleteData(measurementID);
if (isSuccessDataLake) {
- boolean isSuccessEventProperty = this.dataLakeManagement.removeEventProperty(measurementID);
+ boolean isSuccessEventProperty = this.dataExplorerSchemaManagement.deleteMeasurementByName(measurementID);
if (isSuccessEventProperty) {
return ok();
} else {
@@ -152,7 +156,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
description = "array of stored measurement series",
content = @Content(array = @ArraySchema(schema = @Schema(implementation = DataLakeMeasure.class))))})
public Response getAll() {
- List<DataLakeMeasure> allMeasurements = this.dataLakeManagement.getAllMeasurements();
+ List<DataLakeMeasure> allMeasurements = this.dataExplorerSchemaManagement.getAllMeasurements();
return ok(allMeasurements);
}
@@ -341,7 +345,7 @@ public class DataLakeResourceV4 extends AbstractRestResource {
responses = {
@ApiResponse(responseCode = "200", description = "All measurement series successfully removed")})
public Response removeAll() {
- boolean isSuccess = this.dataLakeManagement.removeAllMeasurements();
+ boolean isSuccess = this.dataLakeManagement.deleteAllData();
return Response.ok(isSuccess).build();
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index fc7d07e5d..bc8d01417 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,9 @@
package org.apache.streampipes.rest;
import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
-import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
+import org.apache.streampipes.dataexplorer.DataExplorerQueryManagement;
+import org.apache.streampipes.dataexplorer.DataExplorerSchemaManagement;
+import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
import org.apache.streampipes.extensions.api.connect.exception.AdapterException;
import org.apache.streampipes.manager.file.FileManager;
import org.apache.streampipes.manager.pipeline.PipelineCacheManager;
@@ -93,13 +95,14 @@ public class ResetManagement {
});
// Remove all data in data lake
- DataLakeManagementV4 dataLakeManagementV4 = new DataLakeManagementV4();
- List<DataLakeMeasure> allMeasurements = dataLakeManagementV4.getAllMeasurements();
+ IDataExplorerSchemaManagement dataLakeMeasureManagement = new DataExplorerSchemaManagement();
+ DataExplorerQueryManagement dataExplorerQueryManagement = new DataExplorerQueryManagement(dataLakeMeasureManagement);
+ List<DataLakeMeasure> allMeasurements = dataLakeMeasureManagement.getAllMeasurements();
allMeasurements.forEach(measurement -> {
- boolean isSuccessDataLake = dataLakeManagementV4.removeMeasurement(measurement.getMeasureName());
+ boolean isSuccessDataLake = dataExplorerQueryManagement.deleteData(measurement.getMeasureName());
if (isSuccessDataLake) {
- dataLakeManagementV4.removeEventProperty(measurement.getMeasureName());
+ dataLakeMeasureManagement.deleteMeasurementByName(measurement.getMeasureName());
}
});
diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
index 1a4f253b7..033458721 100644
--- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
+++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesResourceConfig.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.service.core;
import org.apache.streampipes.ps.DataLakeImageResource;
-import org.apache.streampipes.ps.DataLakeMeasureResourceV3;
import org.apache.streampipes.ps.DataLakeMeasureResourceV4;
import org.apache.streampipes.ps.DataLakeResourceV3;
import org.apache.streampipes.ps.DataLakeResourceV4;
@@ -120,7 +119,6 @@ public class StreamPipesResourceConfig extends BaseResourceConfig {
DataLakeWidgetResource.class,
DataLakeImageResource.class,
DataLakeResourceV3.class,
- DataLakeMeasureResourceV3.class,
DataLakeMeasureResourceV4.class,
DataStream.class,
EmailConfigurationResource.class,