You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/08/01 13:59:24 UTC
[incubator-streampipes] branch STREAMPIPES-563 updated: [STREAMPIPES-563] Move data lake logic into package streampipes-data-explorer-commons
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-563
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-563 by this push:
new dd37cfaf5 [STREAMPIPES-563] Move data lake logic into package streampipes-data-explorer-commons
dd37cfaf5 is described below
commit dd37cfaf518f065baa4018e343f7b0b30e78dd6c
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Aug 1 15:58:43 2022 +0200
[STREAMPIPES-563] Move data lake logic into package
streampipes-data-explorer-commons
---
.../backend/StreamPipesResourceConfig.java | 8 +-
streampipes-data-explorer-commons/pom.xml | 9 +-
.../dataexplorer/commons/DataExplorerUtils.java | 55 ++++++--
.../dataexplorer/commons/DataExplorerWriter.java | 20 +++
.../dataexplorer/commons/TimeSeriesStore.java | 86 ++++++++++++
.../commons/couchdb/CouchDbConfigurations.java | 29 ++---
.../commons/couchdb/CouchDbEnvKeys.java | 5 +-
.../dataexplorer/commons/image/ImageStore.java | 85 ++++++++++++
.../commons/image/ImageStoreUtils.java | 34 ++---
.../{ => influx}/DataExplorerConfigurations.java | 2 +-
.../DataExplorerConnectionSettings.java | 9 +-
.../commons/{ => influx}/DataExplorerDefaults.java | 2 +-
.../commons/{ => influx}/DataExplorerEnvKeys.java | 2 +-
.../commons}/influx/InfluxDbReservedKeywords.java | 2 +-
.../commons/influx/InfluxNameSanitizer.java | 30 ++---
.../dataexplorer/commons/influx/InfluxStore.java | 72 +++++------
.../dataexplorer/DataLakeManagementV4.java | 69 +++++++++-
.../dataexplorer/DataLakeNoUserManagementV3.java | 3 +
.../streampipes-sinks-internal-jvm/pom.xml | 3 +
.../sinks/internal/jvm/SinksInternalJvmInit.java | 8 +-
.../sinks/internal/jvm/datalake/DataLakeSink.java | 144 ++++++---------------
.../internal/jvm/datalake/image/ImageStore.java | 54 --------
.../model/datalake/DataLakeMeasure.java | 26 ++++
.../streampipes/ps}/DataLakeMeasureResourceV3.java | 3 +-
.../streampipes/ps/DataLakeMeasureResourceV4.java | 26 ++--
.../apache/streampipes/ps/DataLakeResourceV3.java | 1 +
26 files changed, 484 insertions(+), 303 deletions(-)
diff --git a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
index 0a6efad7b..8972c5ef5 100644
--- a/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
+++ b/streampipes-backend/src/main/java/org/apache/streampipes/backend/StreamPipesResourceConfig.java
@@ -19,10 +19,7 @@
package org.apache.streampipes.backend;
import io.swagger.v3.jaxrs2.integration.resources.OpenApiResource;
-import org.apache.streampipes.ps.DataLakeImageResource;
-import org.apache.streampipes.ps.DataLakeResourceV3;
-import org.apache.streampipes.ps.DataLakeResourceV4;
-import org.apache.streampipes.ps.PipelineElementTemplateResource;
+import org.apache.streampipes.ps.*;
import org.apache.streampipes.rest.impl.*;
import org.apache.streampipes.rest.impl.admin.*;
import org.apache.streampipes.rest.impl.connect.*;
@@ -30,7 +27,6 @@ import org.apache.streampipes.rest.impl.dashboard.Dashboard;
import org.apache.streampipes.rest.impl.dashboard.DashboardWidget;
import org.apache.streampipes.rest.impl.dashboard.VisualizablePipelineResource;
import org.apache.streampipes.rest.impl.datalake.DataLakeDashboardResource;
-import org.apache.streampipes.rest.impl.datalake.DataLakeMeasureResourceV3;
import org.apache.streampipes.rest.impl.datalake.DataLakeWidgetResource;
import org.apache.streampipes.rest.impl.datalake.PersistedDataStreamResource;
import org.apache.streampipes.rest.impl.nouser.PipelineElementImportNoUser;
@@ -65,6 +61,7 @@ public class StreamPipesResourceConfig extends ResourceConfig {
register(DataLakeImageResource.class);
register(DataLakeResourceV3.class);
register(DataLakeMeasureResourceV3.class);
+ register(DataLakeMeasureResourceV4.class);
register(DataStream.class);
register(EmailConfigurationResource.class);
register(EmailResource.class);
@@ -104,7 +101,6 @@ public class StreamPipesResourceConfig extends ResourceConfig {
register(DataLakeDashboardResource.class);
register(DataLakeWidgetResource.class);
register(DataLakeResourceV3.class);
- register(DataLakeMeasureResourceV3.class);
register(PipelineElementFile.class);
register(DashboardWidget.class);
register(Dashboard.class);
diff --git a/streampipes-data-explorer-commons/pom.xml b/streampipes-data-explorer-commons/pom.xml
index 2d14cd7e1..5da55d089 100644
--- a/streampipes-data-explorer-commons/pom.xml
+++ b/streampipes-data-explorer-commons/pom.xml
@@ -35,15 +35,16 @@
<!-- Others -->
+ <dependency>
+ <groupId>org.lightcouch</groupId>
+ <artifactId>lightcouch</artifactId>
+ </dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
</dependency>
</dependencies>
- <properties>
- <maven.compiler.source>11</maven.compiler.source>
- <maven.compiler.target>11</maven.compiler.target>
- </properties>
+
</project>
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index b739883be..8749b3cc0 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -19,22 +19,61 @@ package org.apache.streampipes.dataexplorer.commons;
import org.apache.streampipes.client.StreamPipesClient;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxNameSanitizer;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.schema.EventProperty;
+
+import java.util.List;
+import java.util.stream.Collectors;
public class DataExplorerUtils {
/**
- * Adds a new measurement to the StreamPipes data lake
+ * Sanitizes the event schema and stores the DataLakeMeasurement to the couchDB
+ *
+ * @param client
* @param measure
- * @param eventSchema
* @throws SpRuntimeException
*/
- public static void registerAtDataLake(String measure,
- EventSchema eventSchema,
- StreamPipesClient client) throws SpRuntimeException {
+ public static DataLakeMeasure sanitizeAndRegisterAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ measure = sanitizeDataLakeMeasure(measure);
+ registerAtDataLake(client, measure);
+
+ return measure;
+ }
+
+ private static void registerAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
client
- .customRequest()
- .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
+ .customRequest()
+ .sendPost("api/v4/datalake/measure/", measure);
}
+ private static DataLakeMeasure sanitizeDataLakeMeasure(DataLakeMeasure measure) throws SpRuntimeException {
+
+ // Removes selected timestamp from event schema
+ measure = removeTimestampsFromEventSchema(measure);
+
+ // Escapes all spaces with _
+ measure.setMeasureName(InfluxNameSanitizer.prepareString(measure.getMeasureName()));
+
+ // Removes all spaces with _ and validates that no special terms are used as runtime names
+ measure.getEventSchema()
+ .getEventProperties()
+ .forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+
+ return measure;
+ }
+
+ private static DataLakeMeasure removeTimestampsFromEventSchema(DataLakeMeasure measure) {
+ List<EventProperty> eventPropertiesWithoutTimestamp = measure.getEventSchema().getEventProperties()
+ .stream()
+ .filter(eventProperty -> !measure.getTimestampField().endsWith(eventProperty.getRuntimeName()))
+ .collect(Collectors.toList());
+ measure.getEventSchema().setEventProperties(eventPropertiesWithoutTimestamp);
+
+ return measure;
+ }
+
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
index 3f3a1edc6..840035068 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
@@ -1,6 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
package org.apache.streampipes.dataexplorer.commons;
+import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerConnectionSettings;
+import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerDefaults;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
@@ -8,6 +27,7 @@ import org.influxdb.dto.Point;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+@Deprecated
public class DataExplorerWriter {
private InfluxDB influxDB;
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
new file mode 100644
index 000000000..a81b6e327
--- /dev/null
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.commons;
+
+import org.apache.streampipes.client.StreamPipesClient;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.commons.image.ImageStore;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxStore;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class TimeSeriesStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStore.class);
+
+ private ImageStore imageStore;
+ private InfluxStore influxStore;
+
+
+ public TimeSeriesStore(SpConfig config,
+ StreamPipesClient client,
+ DataLakeMeasure measure,
+ boolean enableImageStore) {
+
+ measure = DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);
+
+ if (enableImageStore) {
+ // TODO check if event properties are replaces correctly
+ this.imageStore = new ImageStore(measure, config);
+ }
+
+ this.influxStore = new InfluxStore(measure, config);
+
+ }
+
+ public boolean onEvent(Event event) throws SpRuntimeException {
+ // Store all images in image store and replace image with internal id
+ if (imageStore != null) {
+ this.imageStore.onEvent(event);
+ }
+
+ // Store event in time series database
+ this.influxStore.onEvent(event);
+
+ return true;
+ }
+
+
+ public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) {
+ return true;
+ }
+
+ public void close() throws SpRuntimeException {
+ if (influxStore != null) {
+ try {
+ this.imageStore.close();
+ } catch (IOException e) {
+ LOG.error("Could not close couchDB connection");
+ throw new SpRuntimeException(e);
+ }
+ }
+
+ this.influxStore.close();
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
similarity index 55%
copy from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
copy to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
index 723d29875..8dc7d7370 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
@@ -15,25 +15,22 @@
* limitations under the License.
*
*/
+package org.apache.streampipes.dataexplorer.commons.couchdb;
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
-public class DataLakeUtils {
+import java.util.Arrays;
+import java.util.List;
- public static String prepareString(String s) {
- return s.toLowerCase().replaceAll(" ", "_");
- }
- private static String renameReservedKeywords(String runtimeName) {
- if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
- return runtimeName + "_";
- } else {
- return runtimeName;
+public class CouchDbConfigurations {
+
+ public static List<ConfigItem> getDefaults() {
+ return Arrays.asList(
+ ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
+ ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
+ ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
+ );
}
- }
- public static String sanitizePropertyRuntimeName(String runtimeName) {
- String sanitizedRuntimeName = prepareString(runtimeName);
- return renameReservedKeywords(sanitizedRuntimeName);
- }
-}
+}
\ No newline at end of file
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
similarity index 91%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
index cf64457e4..4dd6b4fb8 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/config/ConfigKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
@@ -16,10 +16,9 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.config;
-
-public class ConfigKeys {
+package org.apache.streampipes.dataexplorer.commons.couchdb;
+public class CouchDbEnvKeys {
public final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
public final static String COUCHDB_PORT = "SP_COUCHDB_PORT";
public final static String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
new file mode 100644
index 000000000..1a3e2356f
--- /dev/null
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.dataexplorer.commons.image;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.dataexplorer.commons.couchdb.CouchDbEnvKeys;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
+import org.lightcouch.CouchDbClient;
+import org.lightcouch.CouchDbProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.UUID;
+
+public class ImageStore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ImageStore.class);
+ private static final String DB_NAME = "images";
+
+ private List<EventProperty> imageProperties;
+ private CouchDbClient couchDbClient;
+
+ public ImageStore(DataLakeMeasure measure, SpConfig config) {
+ this.couchDbClient = new CouchDbClient(from(config));
+ this.imageProperties = ImageStoreUtils.getImageProperties(measure);
+ }
+
+ public void onEvent(Event event) throws SpRuntimeException{
+ this.imageProperties.forEach(eventProperty -> {
+ String imageDocId = UUID.randomUUID().toString();
+ String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
+
+ byte[] data = Base64.decodeBase64(image);
+ storeImage(data, imageDocId);
+ event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
+ });
+ }
+
+ public void storeImage(byte[] imageBytes,
+ String imageDocId) {
+ this.couchDbClient.saveAttachment(
+ new ByteArrayInputStream(imageBytes),
+ imageDocId,
+ "image/jpeg",
+ imageDocId,
+ null);
+
+ }
+
+ public void close() throws IOException {
+ this.couchDbClient.close();
+ }
+
+ private static CouchDbProperties from(SpConfig config) {
+ String couchDbProtocol = config.getString(CouchDbEnvKeys.COUCHDB_PROTOCOL);
+ String couchDbHost = config.getString(CouchDbEnvKeys.COUCHDB_HOST);
+ int couchDbPort = config.getInteger(CouchDbEnvKeys.COUCHDB_PORT);
+
+ return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
+ couchDbHost, couchDbPort, null, null);
+ }
+}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
similarity index 55%
copy from streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
copy to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
index 6463d9428..ba3ab6c0c 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
@@ -16,32 +16,22 @@
*
*/
-package org.apache.streampipes.ps;
+package org.apache.streampipes.dataexplorer.commons.image;
-import org.apache.streampipes.dataexplorer.utils.DataExplorerUtils;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
-import org.apache.streampipes.rest.core.base.impl.AbstractRestResource;
-import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.vocabulary.SPSensor;
-import javax.ws.rs.GET;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
import java.util.List;
+import java.util.stream.Collectors;
-@Path("/v3/datalake")
-public class DataLakeResourceV3 extends AbstractRestResource {
- public DataLakeResourceV3() {
- }
-
- @GET
- @JacksonSerialized
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/info")
- public Response getAllInfos() {
- List<DataLakeMeasure> result = DataExplorerUtils.getInfos();
- return ok(result);
- }
+public class ImageStoreUtils {
+ public static List<EventProperty> getImageProperties(DataLakeMeasure measure) {
+ return measure.getEventSchema().getEventProperties().stream()
+ .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
+ eventProperty.getDomainProperties().size() > 0 &&
+ eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
similarity index 97%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConfigurations.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
index 595d75f84..1baf5aa72 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConfigurations.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConnectionSettings.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
similarity index 92%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConnectionSettings.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
index 08c6fc312..63b943dec 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerConnectionSettings.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
@@ -16,8 +16,9 @@
*
*/
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
public class DataExplorerConnectionSettings {
@@ -30,16 +31,16 @@ public class DataExplorerConnectionSettings {
private final String password;
public static DataExplorerConnectionSettings from(SpConfig configStore) {
- return from(configStore, "");
+ return from(configStore, null);
}
- public static DataExplorerConnectionSettings from(SpConfig configStore, String measureName) {
+ public static DataExplorerConnectionSettings from(SpConfig configStore, DataLakeMeasure measure) {
return new DataExplorerConnectionSettings(
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL) + "://" + configStore.getString(DataExplorerEnvKeys.DATA_LAKE_HOST),
configStore.getInteger(DataExplorerEnvKeys.DATA_LAKE_PORT),
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME),
- measureName,
+ measure.getMeasureName(),
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_USERNAME),
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PASSWORD));
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerDefaults.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
similarity index 95%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerDefaults.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
index bc6cc263b..c31c02c38 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerDefaults.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
public class DataExplorerDefaults {
public final static String DATA_LAKE_HOST = "influxdb";
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
similarity index 95%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerEnvKeys.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
index c93b5f9d8..84ad4557e 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerEnvKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.commons;
+package org.apache.streampipes.dataexplorer.commons.influx;
public class DataExplorerEnvKeys {
public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
similarity index 97%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
index 66911ef8c..754fc7c94 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/InfluxDbReservedKeywords.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+package org.apache.streampipes.dataexplorer.commons.influx;
import java.util.Arrays;
import java.util.List;
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
similarity index 54%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
index 723d29875..df7860a75 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
@@ -16,24 +16,24 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+package org.apache.streampipes.dataexplorer.commons.influx;
-public class DataLakeUtils {
+public class InfluxNameSanitizer {
- public static String prepareString(String s) {
- return s.toLowerCase().replaceAll(" ", "_");
- }
+ public static String prepareString(String s) {
+ return s.replaceAll(" ", "_");
+ }
- private static String renameReservedKeywords(String runtimeName) {
- if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
- return runtimeName + "_";
- } else {
- return runtimeName;
+ private static String renameReservedKeywords(String runtimeName) {
+ if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
+ return runtimeName + "_";
+ } else {
+ return runtimeName;
+ }
}
- }
- public static String sanitizePropertyRuntimeName(String runtimeName) {
- String sanitizedRuntimeName = prepareString(runtimeName);
- return renameReservedKeywords(sanitizedRuntimeName);
- }
+ public static String sanitizePropertyRuntimeName(String runtimeName) {
+ String sanitizedRuntimeName = prepareString(runtimeName);
+ return renameReservedKeywords(sanitizedRuntimeName);
+ }
}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
similarity index 76%
rename from streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 5b8aa2f01..8b42d940a 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/influx/DataLakeInfluxDbClient.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -16,18 +16,19 @@
*
*/
-package org.apache.streampipes.sinks.internal.jvm.datalake.influx;
+package org.apache.streampipes.dataexplorer.commons.influx;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerConnectionSettings;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.PrimitiveField;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
-import org.apache.streampipes.model.schema.EventSchema;
+import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.vocabulary.XSD;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
+
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
@@ -40,48 +41,45 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-/**
- * Code is the same as InfluxDB (org.apache.streampipes.sinks.databases.jvm.influxdb) sink. Changes applied here should also be applied in the InfluxDB sink
- */
-public class DataLakeInfluxDbClient {
+public class InfluxStore {
- private static final Logger LOG = LoggerFactory.getLogger(DataLakeInfluxDbClient.class);
+ private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- private final String measureName;
- private final String timestampField;
private final Integer batchSize = 2000;
private final Integer flushDuration = 500;
private InfluxDB influxDb = null;
- private final DataExplorerConnectionSettings settings;
- private final EventSchema originalEventSchema;
+ DataLakeMeasure measure;
+// private final DataExplorerConnectionSettings settings;
Map<String, String> targetRuntimeNames = new HashMap<>();
- public DataLakeInfluxDbClient(DataExplorerConnectionSettings settings,
- String timestampField,
- EventSchema originalEventSchema) throws SpRuntimeException {
- this.settings = settings;
- this.originalEventSchema = originalEventSchema;
- this.timestampField = timestampField;
- this.measureName = settings.getMeasureName();
+ public InfluxStore(DataLakeMeasure measure,
+ SpConfig configStore) throws SpRuntimeException {
- prepareSchema();
- connect();
- }
+ this.measure = measure;
+ DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(configStore, measure);
- private void prepareSchema() {
- originalEventSchema
+ // TODO check if this works
+ measure.getEventSchema()
.getEventProperties()
- .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), DataLakeUtils.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+ .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+
+ connect(settings);
}
+ /**
+ *
+ * @param measure
+ */
+
+
/**
* Connects to the InfluxDB Server, sets the database and initializes the batch-behaviour
*
* @throws SpRuntimeException If not connection can be established or if the database could not
* be found
*/
- private void connect() throws SpRuntimeException {
+ private void connect(DataExplorerConnectionSettings settings) throws SpRuntimeException {
// Connecting to the server
// "http://" must be in front
String urlAndPort = settings.getInfluxDbHost() + ":" + settings.getInfluxDbPort();
@@ -105,13 +103,6 @@ public class DataLakeInfluxDbClient {
influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS);
}
- /**
- * Checks whether the given database exists. Needs a working connection to an InfluxDB Server
- * ({@link DataLakeInfluxDbClient#influxDb} needs to be initialized)
- *
- * @param dbName The name of the database, the method should look for
- * @return True if the database exists, false otherwise
- */
private boolean databaseExists(String dbName) {
QueryResult queryResult = influxDb.query(new Query("SHOW DATABASES", ""));
for(List<Object> a : queryResult.getResults().get(0).getSeries().get(0).getValues()) {
@@ -140,19 +131,20 @@ public class DataLakeInfluxDbClient {
* @param event The event which should be saved
* @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
*/
- public void save(Event event, EventSchema schema) throws SpRuntimeException {
+ public void onEvent(Event event) throws SpRuntimeException {
if (event == null) {
throw new SpRuntimeException("event is null");
}
- Long timestampValue = event.getFieldBySelector(timestampField).getAsPrimitive().getAsLong();
- Point.Builder p = Point.measurement(measureName).time(timestampValue, TimeUnit.MILLISECONDS);
+ Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong();
+ Point.Builder p = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS);
- for (EventProperty ep : schema.getEventProperties()) {
+ for (EventProperty ep : measure.getEventSchema().getEventProperties()) {
if (ep instanceof EventPropertyPrimitive) {
String runtimeName = ep.getRuntimeName();
- if (!timestampField.endsWith(runtimeName)) {
+ // timestamp should not be added as a field
+ if (!measure.getTimestampField().endsWith(runtimeName)) {
String preparedRuntimeName = targetRuntimeNames.get(runtimeName);
PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
@@ -190,12 +182,12 @@ public class DataLakeInfluxDbClient {
/**
* Shuts down the connection to the InfluxDB server
*/
- public void stop() {
+ public void close() throws SpRuntimeException {
influxDb.flush();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- e.printStackTrace();
+ throw new SpRuntimeException(e);
}
influxDb.close();
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
index 1ccb4742d..523cbe8aa 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeManagementV4.java
@@ -36,12 +36,16 @@ import org.apache.streampipes.model.datalake.DataLakeConfiguration;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.datalake.DataLakeRetentionPolicy;
import org.apache.streampipes.model.datalake.SpQueryResult;
+import org.apache.streampipes.model.schema.*;
+import org.apache.streampipes.storage.api.IDataLakeStorage;
import org.apache.streampipes.storage.couchdb.utils.Utils;
+import org.apache.streampipes.storage.management.StorageDispatcher;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.lightcouch.CouchDbClient;
+import javax.xml.crypto.Data;
import java.io.IOException;
import java.io.OutputStream;
import java.time.Instant;
@@ -53,10 +57,7 @@ import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
import static org.apache.streampipes.dataexplorer.v4.SupportedDataLakeQueryParameters.*;
@@ -327,4 +328,64 @@ public class DataLakeManagementV4 {
return tags;
}
+
+
+ // TODO validate method
+ public DataLakeMeasure addDataLake(DataLakeMeasure measure) {
+ List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
+ Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure.getMeasureName())).findFirst();
+
+ if (optional.isPresent()) {
+ DataLakeMeasure oldEntry = optional.get();
+ if (!compareEventProperties(oldEntry.getEventSchema().getEventProperties(), measure.getEventSchema().getEventProperties())) {
+ return oldEntry;
+ }
+ } else {
+ measure.setSchemaVersion(DataLakeMeasure.CURRENT_SCHEMA_VERSION);
+ getDataLakeStorage().storeDataLakeMeasure(measure);
+ return measure;
+ }
+
+ return measure;
+ }
+
+ private boolean compareEventProperties(List<EventProperty> prop1, List<EventProperty> prop2) {
+ if (prop1.size() != prop2.size()) {
+ return false;
+ }
+
+ return prop1.stream().allMatch(prop -> {
+
+ for (EventProperty property : prop2) {
+ if (prop.getRuntimeName().equals(property.getRuntimeName())) {
+
+ //primitive
+ if (prop instanceof EventPropertyPrimitive && property instanceof EventPropertyPrimitive) {
+ if (((EventPropertyPrimitive) prop)
+ .getRuntimeType()
+ .equals(((EventPropertyPrimitive) property).getRuntimeType())) {
+ return true;
+ }
+
+ //list
+ } else if (prop instanceof EventPropertyList && property instanceof EventPropertyList) {
+ return compareEventProperties(Collections.singletonList(((EventPropertyList) prop).getEventProperty()),
+ Collections.singletonList(((EventPropertyList) property).getEventProperty()));
+
+ //nested
+ } else if (prop instanceof EventPropertyNested && property instanceof EventPropertyNested) {
+ return compareEventProperties(((EventPropertyNested) prop).getEventProperties(),
+ ((EventPropertyNested) property).getEventProperties());
+ }
+ }
+ }
+ return false;
+
+ });
+ }
+
+
+ private IDataLakeStorage getDataLakeStorage() {
+ return StorageDispatcher.INSTANCE.getNoSqlStore().getDataLakeStorage();
+ }
}
diff --git a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
index 2ae5c3dea..5b411a0cf 100644
--- a/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
+++ b/streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataLakeNoUserManagementV3.java
@@ -28,8 +28,11 @@ import java.util.List;
import java.util.Optional;
+@Deprecated
public class DataLakeNoUserManagementV3 {
+
+ @Deprecated
public boolean addDataLake(String measure, EventSchema eventSchema) {
List<DataLakeMeasure> dataLakeMeasureList = getDataLakeStorage().getAllDataLakeMeasures();
Optional<DataLakeMeasure> optional = dataLakeMeasureList.stream().filter(entry -> entry.getMeasureName().equals(measure)).findFirst();
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
index 62babfd51..3e7159e3a 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/pom.xml
@@ -56,6 +56,7 @@
</dependency>
<!-- External dependencies -->
+ <!-- TODO remove after refactoring -->
<dependency>
<groupId>org.lightcouch</groupId>
<artifactId>lightcouch</artifactId>
@@ -64,6 +65,8 @@
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
</dependency>
+ <!-- TODO remove after refactoring -->
+
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 7aae0127f..42e0dc80a 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -21,7 +21,8 @@ package org.apache.streampipes.sinks.internal.jvm;
import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerConfigurations;
+import org.apache.streampipes.dataexplorer.commons.couchdb.CouchDbConfigurations;
+import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerConfigurations;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;
@@ -29,7 +30,6 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
-import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
import org.apache.streampipes.sinks.internal.jvm.datalake.DataLakeSink;
import org.apache.streampipes.sinks.internal.jvm.notification.NotificationProducer;
@@ -53,10 +53,8 @@ public class SinksInternalJvmInit extends StandaloneModelSubmitter {
new SpKafkaProtocolFactory(),
new SpJmsProtocolFactory(),
new SpMqttProtocolFactory())
- .addConfig(ConfigKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs")
- .addConfig(ConfigKeys.COUCHDB_PORT, 5984, "")
- .addConfig(ConfigKeys.COUCHDB_PROTOCOL, "http", "")
.addConfigs(DataExplorerConfigurations.getDefaults())
+ .addConfigs(CouchDbConfigurations.getDefaults())
.build();
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
index 571d027d1..c268573ae 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeSink.java
@@ -1,14 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
package org.apache.streampipes.sinks.internal.jvm.datalake;
-import org.apache.commons.codec.binary.Base64;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerConnectionSettings;
-import org.apache.streampipes.dataexplorer.commons.DataExplorerUtils;
+import org.apache.streampipes.dataexplorer.commons.TimeSeriesStore;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.DataSinkType;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.graph.DataSinkDescription;
import org.apache.streampipes.model.runtime.Event;
-import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.sdk.builder.DataSinkBuilder;
@@ -17,138 +33,60 @@ import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.utils.Assets;
-import org.apache.streampipes.sinks.internal.jvm.config.ConfigKeys;
-import org.apache.streampipes.sinks.internal.jvm.datalake.image.ImageStore;
-import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeInfluxDbClient;
-import org.apache.streampipes.sinks.internal.jvm.datalake.influx.DataLakeUtils;
-import org.apache.streampipes.svcdiscovery.api.SpConfig;
-import org.apache.streampipes.vocabulary.SPSensor;
import org.apache.streampipes.wrapper.context.EventSinkRuntimeContext;
import org.apache.streampipes.wrapper.standalone.SinkParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataSink;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Collectors;
public class DataLakeSink extends StreamPipesDataSink {
-
- private DataLakeInfluxDbClient influxDbClient;
+ private static Logger LOG;
private static final String DATABASE_MEASUREMENT_KEY = "db_measurement";
private static final String TIMESTAMP_MAPPING_KEY = "timestamp_mapping";
- private static Logger LOG;
-
- private String timestampField;
- private List<EventProperty> imageProperties;
+ private TimeSeriesStore timeSeriesStore;
- private EventSchema eventSchema;
- private ImageStore imageStore;
@Override
public DataSinkDescription declareModel() {
return DataSinkBuilder.create("org.apache.streampipes.sinks.internal.jvm.datalake")
- .withLocales(Locales.EN)
- .withAssets(Assets.DOCUMENTATION, Assets.ICON)
- .category(DataSinkType.INTERNAL)
- .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
- EpRequirements.timestampReq(),
- Labels.withId(TIMESTAMP_MAPPING_KEY),
- PropertyScope.NONE).build())
- .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
- .build();
+ .withLocales(Locales.EN)
+ .withAssets(Assets.DOCUMENTATION, Assets.ICON)
+ .category(DataSinkType.INTERNAL)
+ .requiredStream(StreamRequirementsBuilder.create().requiredPropertyWithUnaryMapping(
+ EpRequirements.timestampReq(),
+ Labels.withId(TIMESTAMP_MAPPING_KEY),
+ PropertyScope.NONE).build())
+ .requiredTextParameter(Labels.withId(DATABASE_MEASUREMENT_KEY))
+ .build();
}
@Override
public void onInvocation(SinkParams parameters, EventSinkRuntimeContext runtimeContext) throws SpRuntimeException {
+
LOG = parameters.getGraph().getLogger(DataLakeSink.class);
- this.timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
+ String timestampField = parameters.extractor().mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
String measureName = parameters.extractor().singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
- // TODO check if still needed
- measureName = DataLakeUtils.prepareString(measureName);
-
-
-
- EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
- // Remove the timestamp field from the event schema
- List<EventProperty> eventPropertiesWithoutTimestamp = schema.getEventProperties()
- .stream()
- .filter(eventProperty -> !this.timestampField.endsWith(eventProperty.getRuntimeName()))
- .collect(Collectors.toList());
- schema.setEventProperties(eventPropertiesWithoutTimestamp);
-
- // deep copy of event schema. Event property runtime name is changed to lower case for the schema registration
- this.eventSchema = new EventSchema(schema);
-
- schema.getEventProperties().forEach(eventProperty ->
- eventProperty.setRuntimeName(DataLakeUtils.sanitizePropertyRuntimeName(eventProperty.getRuntimeName())));
- DataExplorerUtils.registerAtDataLake(measureName, schema, runtimeContext.getStreamPipesClient());
+ EventSchema eventSchema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
+ DataLakeMeasure measure = new DataLakeMeasure(measureName, timestampField, eventSchema);
- // Get schema from couchdb
+ this.timeSeriesStore = new TimeSeriesStore(runtimeContext.getConfigStore().getConfig(),
+ runtimeContext.getStreamPipesClient(),
+ measure,
+ true);
- // if schema version null -> rename event schema
-
- //
-
- imageProperties = schema.getEventProperties().stream()
- .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
- eventProperty.getDomainProperties().size() > 0 &&
- eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
- .collect(Collectors.toList());
-
-
-
- SpConfig configStore = runtimeContext.getConfigStore().getConfig();
- String couchDbProtocol = configStore.getString(ConfigKeys.COUCHDB_PROTOCOL);
- String couchDbHost = configStore.getString(ConfigKeys.COUCHDB_HOST);
- int couchDbPort = configStore.getInteger(ConfigKeys.COUCHDB_PORT);
- this.imageStore = new ImageStore(couchDbProtocol, couchDbHost, couchDbPort);
-
- DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(
- configStore,
- measureName);
-
-
-
- this.influxDbClient = new DataLakeInfluxDbClient(
- settings,
- this.timestampField,
- this.eventSchema
- );
}
@Override
public void onEvent(Event event) throws SpRuntimeException {
- // handles image properties by storing the image in couchdb and replacing the values with the document id
- try {
- this.imageProperties.forEach(eventProperty -> {
- String imageDocId = UUID.randomUUID().toString();
- String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
-
- byte[] data = Base64.decodeBase64(image);
- this.imageStore.storeImage(data, imageDocId);
- event.updateFieldBySelector("s0::" + eventProperty.getRuntimeName(), imageDocId);
- });
- } catch (SpRuntimeException e) {
- LOG.error(e.getMessage());
- }
-
-
- // store event in time series database
- try {
- influxDbClient.save(event, this.eventSchema);
- } catch (SpRuntimeException e) {
- LOG.error(e.getMessage());
- }
-
+ this.timeSeriesStore.onEvent(event);
}
@Override
public void onDetach() throws SpRuntimeException {
- influxDbClient.stop();
+ this.timeSeriesStore.close();
}
}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/image/ImageStore.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/image/ImageStore.java
deleted file mode 100644
index 8bec59972..000000000
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/image/ImageStore.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.streampipes.sinks.internal.jvm.datalake.image;
-
-import org.lightcouch.CouchDbClient;
-import org.lightcouch.CouchDbProperties;
-
-import java.io.ByteArrayInputStream;
-
-public class ImageStore {
-
- private static final String DB_NAME = "images";
-
- private CouchDbClient couchDbClient;
-
- public ImageStore(String couchDbProtocol,
- String couchDbHost,
- int couchDbPort) {
- this.couchDbClient = new CouchDbClient(props(couchDbProtocol, couchDbHost, couchDbPort));
- }
-
- public void storeImage(byte[] imageBytes,
- String imageDocId) {
- this.couchDbClient.saveAttachment(
- new ByteArrayInputStream(imageBytes),
- imageDocId,
- "image/jpeg",
- imageDocId,
- null);
- }
-
- private static CouchDbProperties props(String couchDbProtocol,
- String couchDbHost,
- int couchDbPort) {
- return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
- couchDbHost, couchDbPort, null, null);
- }
-}
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
index 7f1df47a9..f1bdc8f82 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java
@@ -25,12 +25,17 @@ import org.apache.streampipes.model.shared.annotation.TsModel;
@TsModel
public class DataLakeMeasure extends UnnamedStreamPipesEntity {
+ public final static String CURRENT_SCHEMA_VERSION = "1.1";
private String measureName;
+
+ private String timestampField;
private EventSchema eventSchema;
private String pipelineId;
private String pipelineName;
private boolean pipelineIsRunning;
+ private String schemaVersion;
+
public DataLakeMeasure() {
super();
}
@@ -47,6 +52,12 @@ public class DataLakeMeasure extends UnnamedStreamPipesEntity {
this.eventSchema = eventSchema;
}
+ public DataLakeMeasure(String measureName, String timestampField, EventSchema eventSchema) {
+ this.measureName = measureName;
+ this.eventSchema = eventSchema;
+ this.timestampField = timestampField;
+ }
+
public String getMeasureName() {
return measureName;
}
@@ -87,4 +98,19 @@ public class DataLakeMeasure extends UnnamedStreamPipesEntity {
this.pipelineIsRunning = pipelineIsRunning;
}
+ public String getSchemaVersion() {
+ return schemaVersion;
+ }
+
+ public void setSchemaVersion(String schemaVersion) {
+ this.schemaVersion = schemaVersion;
+ }
+
+ public String getTimestampField() {
+ return timestampField;
+ }
+
+ public void setTimestampField(String timestampField) {
+ this.timestampField = timestampField;
+ }
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
similarity index 97%
copy from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
copy to streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
index 57e3b935c..877b88451 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV3.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.rest.impl.datalake;
+package org.apache.streampipes.ps;
import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
import org.apache.streampipes.model.schema.EventSchema;
@@ -28,6 +28,7 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@Path("/v3/datalake/measure")
+@Deprecated
public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
private DataLakeNoUserManagementV3 dataLakeManagement;
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
similarity index 68%
rename from streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
rename to streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
index 57e3b935c..4319cf84c 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResourceV3.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeMeasureResourceV4.java
@@ -16,9 +16,11 @@
*
*/
-package org.apache.streampipes.rest.impl.datalake;
+package org.apache.streampipes.ps;
+import org.apache.streampipes.dataexplorer.DataLakeManagementV4;
import org.apache.streampipes.dataexplorer.DataLakeNoUserManagementV3;
+import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.shared.annotation.JacksonSerialized;
@@ -27,26 +29,22 @@ import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
-@Path("/v3/datalake/measure")
-public class DataLakeMeasureResourceV3 extends AbstractAuthGuardedRestResource {
+@Path("/v4/datalake/measure")
+public class DataLakeMeasureResourceV4 extends AbstractAuthGuardedRestResource {
- private DataLakeNoUserManagementV3 dataLakeManagement;
+ private DataLakeManagementV4 dataLakeManagement;
- public DataLakeMeasureResourceV3() {
- this.dataLakeManagement = new DataLakeNoUserManagementV3();
+ public DataLakeMeasureResourceV4() {
+ this.dataLakeManagement = new DataLakeManagementV4();
}
@POST
@JacksonSerialized
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
- @Path("/{measure}")
- public Response addDataLake(@PathParam("measure") String measure, EventSchema eventSchema) {
- if (this.dataLakeManagement.addDataLake(measure, eventSchema)) {
- return ok();
- } else {
- return Response.status(409).build();
- }
-
+ @Path("/")
+ public Response addDataLake(DataLakeMeasure dataLakeMeasure) {
+ DataLakeMeasure result = this.dataLakeManagement.addDataLake(dataLakeMeasure);
+ return ok(result);
}
}
diff --git a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
index 6463d9428..00fd8c9b7 100644
--- a/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
+++ b/streampipes-platform-services/src/main/java/org/apache/streampipes/ps/DataLakeResourceV3.java
@@ -31,6 +31,7 @@ import javax.ws.rs.core.Response;
import java.util.List;
@Path("/v3/datalake")
+@Deprecated
public class DataLakeResourceV3 extends AbstractRestResource {
public DataLakeResourceV3() {
}