You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by bo...@apache.org on 2022/12/12 20:21:40 UTC
[streampipes] 03/03: add checkstyle to streampipes-data-explorer-commons
This is an automated email from the ASF dual-hosted git repository.
bossenti pushed a commit to branch add-checkstyle-configuration
in repository https://gitbox.apache.org/repos/asf/streampipes.git
commit bc7938cc104472dfc4e2da200df5e3c5078c33f6
Author: bossenti <bo...@posteo.de>
AuthorDate: Mon Dec 12 21:21:24 2022 +0100
add checkstyle to streampipes-data-explorer-commons
---
streampipes-data-explorer-commons/pom.xml | 14 +-
.../dataexplorer/commons/DataExplorerUtils.java | 82 +++++------
.../dataexplorer/commons/DataExplorerWriter.java | 54 ++++----
.../dataexplorer/commons/TimeSeriesStore.java | 72 +++++-----
.../commons/configs/CouchDbConfigurations.java | 14 +-
.../commons/configs/CouchDbEnvKeys.java | 6 +-
.../configs/DataExplorerConfigurations.java | 27 ++--
.../commons/configs/DataExplorerEnvKeys.java | 12 +-
.../dataexplorer/commons/image/ImageStore.java | 33 ++---
.../commons/image/ImageStoreUtils.java | 14 +-
.../commons/influx/InfluxConnectionSettings.java | 23 ++--
.../commons/influx/InfluxDbReservedKeywords.java | 150 ++++++++++-----------
.../commons/influx/InfluxNameSanitizer.java | 12 +-
.../dataexplorer/commons/influx/InfluxStore.java | 8 +-
14 files changed, 267 insertions(+), 254 deletions(-)
diff --git a/streampipes-data-explorer-commons/pom.xml b/streampipes-data-explorer-commons/pom.xml
index 66e8e6d54..d8631cbe9 100644
--- a/streampipes-data-explorer-commons/pom.xml
+++ b/streampipes-data-explorer-commons/pom.xml
@@ -16,7 +16,8 @@
~ limitations under the License.
~
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>streampipes-parent</artifactId>
<groupId>org.apache.streampipes</groupId>
@@ -59,7 +60,12 @@
<artifactId>influxdb-java</artifactId>
</dependency>
</dependencies>
-
-
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index ba420d20d..a969aca7f 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -27,56 +27,56 @@ import java.util.List;
import java.util.stream.Collectors;
public class DataExplorerUtils {
- /**
- * Sanitizes the event schema and stores the DataLakeMeasurement to the couchDB
- *
- * @param client StreamPipes client to store measure
- * @param measure DataLakeMeasurement
- */
- public static DataLakeMeasure sanitizeAndRegisterAtDataLake(StreamPipesClient client,
- DataLakeMeasure measure) throws SpRuntimeException {
- sanitizeDataLakeMeasure(measure);
- registerAtDataLake(client, measure);
+ /**
+ * Sanitizes the event schema and stores the DataLakeMeasurement to the couchDB
+ *
+ * @param client StreamPipes client to store measure
+ * @param measure DataLakeMeasurement
+ */
+ public static DataLakeMeasure sanitizeAndRegisterAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ sanitizeDataLakeMeasure(measure);
+ registerAtDataLake(client, measure);
- return measure;
- }
+ return measure;
+ }
- public static DataLakeMeasure sanitizeAndUpdateAtDataLake(StreamPipesClient client,
- DataLakeMeasure measure) throws SpRuntimeException {
- sanitizeDataLakeMeasure(measure);
- updateAtDataLake(client, measure);
- return measure;
- }
+ public static DataLakeMeasure sanitizeAndUpdateAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ sanitizeDataLakeMeasure(measure);
+ updateAtDataLake(client, measure);
+ return measure;
+ }
- private static void registerAtDataLake(StreamPipesClient client,
- DataLakeMeasure measure) throws SpRuntimeException {
- client.dataLakeMeasureApi().create(measure);
- }
+ private static void registerAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ client.dataLakeMeasureApi().create(measure);
+ }
- public static void updateAtDataLake(StreamPipesClient client,
- DataLakeMeasure measure) throws SpRuntimeException {
- client.dataLakeMeasureApi().update(measure);
- }
+ public static void updateAtDataLake(StreamPipesClient client,
+ DataLakeMeasure measure) throws SpRuntimeException {
+ client.dataLakeMeasureApi().update(measure);
+ }
- private static void sanitizeDataLakeMeasure(DataLakeMeasure measure) throws SpRuntimeException {
+ private static void sanitizeDataLakeMeasure(DataLakeMeasure measure) throws SpRuntimeException {
- // Removes selected timestamp from event schema
- removeTimestampsFromEventSchema(measure);
+ // Removes selected timestamp from event schema
+ removeTimestampsFromEventSchema(measure);
- // Removes all spaces with _ and validates that no special terms are used as runtime names
- measure.getEventSchema()
- .getEventProperties()
- .forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
+ // Removes all spaces with _ and validates that no special terms are used as runtime names
+ measure.getEventSchema()
+ .getEventProperties()
+ .forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.renameReservedKeywords(ep.getRuntimeName())));
- }
+ }
- private static void removeTimestampsFromEventSchema(DataLakeMeasure measure) {
- List<EventProperty> eventPropertiesWithoutTimestamp = measure.getEventSchema().getEventProperties()
- .stream()
- .filter(eventProperty -> !measure.getTimestampField().endsWith(eventProperty.getRuntimeName()))
- .collect(Collectors.toList());
- measure.getEventSchema().setEventProperties(eventPropertiesWithoutTimestamp);
- }
+ private static void removeTimestampsFromEventSchema(DataLakeMeasure measure) {
+ List<EventProperty> eventPropertiesWithoutTimestamp = measure.getEventSchema().getEventProperties()
+ .stream()
+ .filter(eventProperty -> !measure.getTimestampField().endsWith(eventProperty.getRuntimeName()))
+ .collect(Collectors.toList());
+ measure.getEventSchema().setEventProperties(eventPropertiesWithoutTimestamp);
+ }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
index 925a08b3f..022bc75d8 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
@@ -20,6 +20,7 @@ package org.apache.streampipes.dataexplorer.commons;
import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
import org.apache.streampipes.dataexplorer.commons.influx.InfluxConnectionSettings;
+
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
@@ -29,37 +30,38 @@ import java.util.concurrent.TimeUnit;
@Deprecated
public class DataExplorerWriter {
- private InfluxDB influxDB;
+ private InfluxDB influxDB;
- // TODO return a connection here
- public void connect(InfluxConnectionSettings dataExplorerConnectionSettings) {
- this.influxDB = InfluxDBFactory.connect(dataExplorerConnectionSettings.getInfluxDbHost() + ":" + dataExplorerConnectionSettings.getInfluxDbPort(),
- dataExplorerConnectionSettings.getUser(), dataExplorerConnectionSettings.getPassword());
- this.influxDB.setDatabase(DataExplorerConfigurations.DATA_LAKE_DATABASE_NAME);
- }
+ // TODO return a connection here
+ public void connect(InfluxConnectionSettings dataExplorerConnectionSettings) {
+ this.influxDB = InfluxDBFactory.connect(
+ dataExplorerConnectionSettings.getInfluxDbHost() + ":" + dataExplorerConnectionSettings.getInfluxDbPort(),
+ dataExplorerConnectionSettings.getUser(), dataExplorerConnectionSettings.getPassword());
+ this.influxDB.setDatabase(DataExplorerConfigurations.DATA_LAKE_DATABASE_NAME);
+ }
- public void close() {
- this.influxDB.close();
- }
+ public void close() {
+ this.influxDB.close();
+ }
- public void write(Map<String, Object> data,
- String measurement) {
- Point.Builder builder = Point.measurement(measurement)
- .time((Long) data.get("timestamp"), TimeUnit.MILLISECONDS);
+ public void write(Map<String, Object> data,
+ String measurement) {
+ Point.Builder builder = Point.measurement(measurement)
+ .time((Long) data.get("timestamp"), TimeUnit.MILLISECONDS);
- data.remove("timestamp");
+ data.remove("timestamp");
- for (String key : data.keySet()) {
- if (data.get(key) instanceof Double || data.get(key) == null) {
- builder.addField(key, (Double) data.get(key));
- } else if (data.get(key) instanceof Integer) {
- builder.addField(key, (Integer) data.get(key));
- } else {
- builder.tag(key, (String) data.get(key));
- }
- }
-
- this.influxDB.write(builder.build());
+ for (String key : data.keySet()) {
+ if (data.get(key) instanceof Double || data.get(key) == null) {
+ builder.addField(key, (Double) data.get(key));
+ } else if (data.get(key) instanceof Integer) {
+ builder.addField(key, (Integer) data.get(key));
+ } else {
+ builder.tag(key, (String) data.get(key));
+ }
}
+ this.influxDB.write(builder.build());
+ }
+
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
index 27fdcb376..83b31be88 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/TimeSeriesStore.java
@@ -25,6 +25,7 @@ import org.apache.streampipes.dataexplorer.commons.influx.InfluxStore;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,55 +33,54 @@ import java.io.IOException;
public class TimeSeriesStore {
- private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesStore.class);
+ private final InfluxStore influxStore;
+ private ImageStore imageStore;
- private ImageStore imageStore;
- private final InfluxStore influxStore;
+ public TimeSeriesStore(SpConfig config,
+ StreamPipesClient client,
+ DataLakeMeasure measure,
+ boolean enableImageStore) {
- public TimeSeriesStore(SpConfig config,
- StreamPipesClient client,
- DataLakeMeasure measure,
- boolean enableImageStore) {
+ measure = DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);
- measure = DataExplorerUtils.sanitizeAndRegisterAtDataLake(client, measure);
+ if (enableImageStore) {
+ // TODO check if event properties are replaces correctly
+ this.imageStore = new ImageStore(measure, config);
+ }
- if (enableImageStore) {
- // TODO check if event properties are replaces correctly
- this.imageStore = new ImageStore(measure, config);
- }
+ this.influxStore = new InfluxStore(measure, config);
- this.influxStore = new InfluxStore(measure, config);
+ }
+ public boolean onEvent(Event event) throws SpRuntimeException {
+ // Store all images in image store and replace image with internal id
+ if (imageStore != null) {
+ this.imageStore.onEvent(event);
}
- public boolean onEvent(Event event) throws SpRuntimeException {
- // Store all images in image store and replace image with internal id
- if (imageStore != null) {
- this.imageStore.onEvent(event);
- }
+ // Store event in time series database
+ this.influxStore.onEvent(event);
- // Store event in time series database
- this.influxStore.onEvent(event);
+ return true;
+ }
- return true;
- }
+ public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) {
+ return true;
+ }
- public boolean alterRetentionTime(DataLakeMeasure dataLakeMeasure) {
- return true;
+ public void close() throws SpRuntimeException {
+ if (imageStore != null) {
+ try {
+ this.imageStore.close();
+ } catch (IOException e) {
+ LOG.error("Could not close couchDB connection");
+ throw new SpRuntimeException(e);
+ }
}
- public void close() throws SpRuntimeException {
- if (imageStore != null) {
- try {
- this.imageStore.close();
- } catch (IOException e) {
- LOG.error("Could not close couchDB connection");
- throw new SpRuntimeException(e);
- }
- }
-
- this.influxStore.close();
- }
+ this.influxStore.close();
+ }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
index 8079e9de1..45678a9c1 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
@@ -24,12 +24,12 @@ import java.util.List;
public class CouchDbConfigurations {
- public static List<ConfigItem> getDefaults() {
- return Arrays.asList(
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
- ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
- );
- }
+ public static List<ConfigItem> getDefaults() {
+ return Arrays.asList(
+ ConfigItem.from(CouchDbEnvKeys.COUCHDB_HOST, "couchdb", "Hostname for CouchDB to store image blobs"),
+ ConfigItem.from(CouchDbEnvKeys.COUCHDB_PORT, 5984, ""),
+ ConfigItem.from(CouchDbEnvKeys.COUCHDB_PROTOCOL, "http", "")
+ );
+ }
}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
index 69a525764..23867c634 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
@@ -19,7 +19,7 @@
package org.apache.streampipes.dataexplorer.commons.configs;
public class CouchDbEnvKeys {
- public final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
- public final static String COUCHDB_PORT = "SP_COUCHDB_PORT";
- public final static String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
+ public static final String COUCHDB_HOST = "SP_COUCHDB_HOST";
+ public static final String COUCHDB_PORT = "SP_COUCHDB_PORT";
+ public static final String COUCHDB_PROTOCOL = "SP_COUCHDB_PROTOCOL";
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
index e0e483c71..feb7135ea 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
@@ -24,18 +24,23 @@ import java.util.List;
public class DataExplorerConfigurations {
- public final static String DATA_LAKE_DATABASE_NAME = "sp";
+ public static final String DATA_LAKE_DATABASE_NAME = "sp";
- public static List<ConfigItem> getDefaults() {
+ public static List<ConfigItem> getDefaults() {
- return Arrays.asList(
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME, "Database name for the StreamPipes data lake database")
- );
- }
+ return Arrays.asList(
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb",
+ "Hostname for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http",
+ "Protocol for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default",
+ "Username for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default",
+ "Password for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME,
+ "Database name for the StreamPipes data lake database")
+ );
+ }
}
\ No newline at end of file
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
index cd4d17c33..6eb840708 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
@@ -18,11 +18,11 @@
package org.apache.streampipes.dataexplorer.commons.configs;
public class DataExplorerEnvKeys {
- public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
- public final static String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
- public final static String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
- public final static String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
- public final static String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
- public final static String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
+ public static final String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
+ public static final String DATA_LAKE_PROTOCOL = "SP_DATA_LAKE_PROTOCOL";
+ public static final String DATA_LAKE_PORT = "SP_DATA_LAKE_PORT";
+ public static final String DATA_LAKE_USERNAME = "SP_DATA_LAKE_USERNAME";
+ public static final String DATA_LAKE_PASSWORD = "SP_DATA_LAKE_PASSWORD";
+ public static final String DATA_LAKE_DATABASE_NAME = "SP_DATA_LAKE_DATABASE_NAME";
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
index 21c51c266..66be08389 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
@@ -18,13 +18,14 @@
package org.apache.streampipes.dataexplorer.commons.image;
-import org.apache.commons.codec.binary.Base64;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.dataexplorer.commons.configs.CouchDbEnvKeys;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
+
+import org.apache.commons.codec.binary.Base64;
import org.lightcouch.CouchDbClient;
import org.lightcouch.CouchDbProperties;
import org.slf4j.Logger;
@@ -48,7 +49,16 @@ public class ImageStore {
this.imageProperties = ImageStoreUtils.getImageProperties(measure);
}
- public void onEvent(Event event) throws SpRuntimeException{
+ private static CouchDbProperties from(SpConfig config) {
+ String couchDbProtocol = config.getString(CouchDbEnvKeys.COUCHDB_PROTOCOL);
+ String couchDbHost = config.getString(CouchDbEnvKeys.COUCHDB_HOST);
+ int couchDbPort = config.getInteger(CouchDbEnvKeys.COUCHDB_PORT);
+
+ return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
+ couchDbHost, couchDbPort, null, null);
+ }
+
+ public void onEvent(Event event) throws SpRuntimeException {
this.imageProperties.forEach(eventProperty -> {
String imageDocId = UUID.randomUUID().toString();
String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
@@ -62,24 +72,15 @@ public class ImageStore {
public void storeImage(byte[] imageBytes,
String imageDocId) {
this.couchDbClient.saveAttachment(
- new ByteArrayInputStream(imageBytes),
- imageDocId,
- "image/jpeg",
- imageDocId,
- null);
+ new ByteArrayInputStream(imageBytes),
+ imageDocId,
+ "image/jpeg",
+ imageDocId,
+ null);
}
public void close() throws IOException {
this.couchDbClient.close();
}
-
- private static CouchDbProperties from(SpConfig config) {
- String couchDbProtocol = config.getString(CouchDbEnvKeys.COUCHDB_PROTOCOL);
- String couchDbHost = config.getString(CouchDbEnvKeys.COUCHDB_HOST);
- int couchDbPort = config.getInteger(CouchDbEnvKeys.COUCHDB_PORT);
-
- return new CouchDbProperties(DB_NAME, true, couchDbProtocol,
- couchDbHost, couchDbPort, null, null);
- }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
index ba3ab6c0c..42594827e 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStoreUtils.java
@@ -27,11 +27,11 @@ import java.util.stream.Collectors;
public class ImageStoreUtils {
- public static List<EventProperty> getImageProperties(DataLakeMeasure measure) {
- return measure.getEventSchema().getEventProperties().stream()
- .filter(eventProperty -> eventProperty.getDomainProperties() != null &&
- eventProperty.getDomainProperties().size() > 0 &&
- eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
- .collect(Collectors.toList());
- }
+ public static List<EventProperty> getImageProperties(DataLakeMeasure measure) {
+ return measure.getEventSchema().getEventProperties().stream()
+ .filter(eventProperty -> eventProperty.getDomainProperties() != null
+ && eventProperty.getDomainProperties().size() > 0
+ && eventProperty.getDomainProperties().get(0).toString().equals(SPSensor.IMAGE))
+ .collect(Collectors.toList());
+ }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java
index a6c153843..0716a9bf7 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.dataexplorer.commons.influx;
import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerEnvKeys;
-import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
public class InfluxConnectionSettings {
@@ -30,17 +29,6 @@ public class InfluxConnectionSettings {
private final String user;
private final String password;
- public static InfluxConnectionSettings from(SpConfig configStore) {
-
- return new InfluxConnectionSettings(
- configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL) + "://" + configStore.getString(DataExplorerEnvKeys.DATA_LAKE_HOST),
- configStore.getInteger(DataExplorerEnvKeys.DATA_LAKE_PORT),
- configStore.getString(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME),
- configStore.getString(DataExplorerEnvKeys.DATA_LAKE_USERNAME),
- configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PASSWORD));
- }
-
-
private InfluxConnectionSettings(String influxDbHost,
Integer influxDbPort,
String databaseName,
@@ -53,6 +41,17 @@ public class InfluxConnectionSettings {
this.password = password;
}
+ public static InfluxConnectionSettings from(SpConfig configStore) {
+
+ return new InfluxConnectionSettings(
+ configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL) + "://"
+ + configStore.getString(DataExplorerEnvKeys.DATA_LAKE_HOST),
+ configStore.getInteger(DataExplorerEnvKeys.DATA_LAKE_PORT),
+ configStore.getString(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME),
+ configStore.getString(DataExplorerEnvKeys.DATA_LAKE_USERNAME),
+ configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PASSWORD));
+ }
+
public Integer getInfluxDbPort() {
return influxDbPort;
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
index 754fc7c94..48d8c9e8d 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxDbReservedKeywords.java
@@ -23,80 +23,80 @@ import java.util.List;
public class InfluxDbReservedKeywords {
- public static final List<String> keywordList = Arrays.asList(
- "ALL",
- "ALTER",
- "ANALYZE",
- "ANY",
- "AS",
- "ASC",
- "BEGIN",
- "BY",
- "CREATE",
- "CONTINUOUS",
- "DATABASE",
- "DATABASES",
- "DEFAULT",
- "DELETE",
- "DESC",
- "DESTINATIONS",
- "DIAGNOSTICS",
- "DISTINCT",
- "DROP",
- "DURATION",
- "END",
- "EVERY",
- "EXPLAIN",
- "FIELD",
- "FOR",
- "FROM",
- "GRANT",
- "GRANTS",
- "GROUP",
- "GROUPS",
- "IN",
- "INF",
- "INSERT",
- "INTO",
- "KEY",
- "KEYS",
- "KILL",
- "LIMIT",
- "SHOW",
- "MEASUREMENT",
- "MEASUREMENTS",
- "NAME",
- "OFFSET",
- "ON",
- "ORDER",
- "PASSWORD",
- "POLICY",
- "POLICIES",
- "PRIVILEGES",
- "QUERIES",
- "QUERY",
- "READ",
- "REPLICATION",
- "RESAMPLE",
- "RETENTION",
- "REVOKE",
- "SELECT",
- "SERIES",
- "SET",
- "SHARD",
- "SHARDS",
- "SLIMIT",
- "SOFFSET",
- "STATS",
- "SUBSCRIPTION",
- "SUBSCRIPTIONS",
- "TAG",
- "TO",
- "USER",
- "USERS",
- "VALUES",
- "WHERE",
- "WITH",
- "WRITE"
+ public static final List<String> KEYWORD_LIST = Arrays.asList(
+ "ALL",
+ "ALTER",
+ "ANALYZE",
+ "ANY",
+ "AS",
+ "ASC",
+ "BEGIN",
+ "BY",
+ "CREATE",
+ "CONTINUOUS",
+ "DATABASE",
+ "DATABASES",
+ "DEFAULT",
+ "DELETE",
+ "DESC",
+ "DESTINATIONS",
+ "DIAGNOSTICS",
+ "DISTINCT",
+ "DROP",
+ "DURATION",
+ "END",
+ "EVERY",
+ "EXPLAIN",
+ "FIELD",
+ "FOR",
+ "FROM",
+ "GRANT",
+ "GRANTS",
+ "GROUP",
+ "GROUPS",
+ "IN",
+ "INF",
+ "INSERT",
+ "INTO",
+ "KEY",
+ "KEYS",
+ "KILL",
+ "LIMIT",
+ "SHOW",
+ "MEASUREMENT",
+ "MEASUREMENTS",
+ "NAME",
+ "OFFSET",
+ "ON",
+ "ORDER",
+ "PASSWORD",
+ "POLICY",
+ "POLICIES",
+ "PRIVILEGES",
+ "QUERIES",
+ "QUERY",
+ "READ",
+ "REPLICATION",
+ "RESAMPLE",
+ "RETENTION",
+ "REVOKE",
+ "SELECT",
+ "SERIES",
+ "SET",
+ "SHARD",
+ "SHARDS",
+ "SLIMIT",
+ "SOFFSET",
+ "STATS",
+ "SUBSCRIPTION",
+ "SUBSCRIPTIONS",
+ "TAG",
+ "TO",
+ "USER",
+ "USERS",
+ "VALUES",
+ "WHERE",
+ "WITH",
+ "WRITE"
);
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
index 302ea4586..353782ba8 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxNameSanitizer.java
@@ -20,12 +20,12 @@ package org.apache.streampipes.dataexplorer.commons.influx;
public class InfluxNameSanitizer {
- public static String renameReservedKeywords(String runtimeName) {
- if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
- return runtimeName + "_";
- } else {
- return runtimeName;
- }
+ public static String renameReservedKeywords(String runtimeName) {
+ if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
+ return runtimeName + "_";
+ } else {
+ return runtimeName;
}
+ }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 5440661e3..d58ba53ff 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
import org.apache.streampipes.vocabulary.XSD;
+
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
@@ -46,11 +47,9 @@ import java.util.concurrent.TimeUnit;
public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
-
- private InfluxDB influxDb = null;
DataLakeMeasure measure;
-
Map<String, String> sanitizedRuntimeNames = new HashMap<>();
+ private InfluxDB influxDb = null;
public InfluxStore(DataLakeMeasure measure,
InfluxConnectionSettings settings) {
@@ -172,7 +171,8 @@ public class InfluxStore {
missingFields.add(runtimeName);
}
} catch (SpRuntimeException iae) {
- LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", runtimeName, iae);
+ LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored",
+ runtimeName, iae);
}
}
}