You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2022/08/02 06:14:25 UTC
[incubator-streampipes] branch STREAMPIPES-563 updated: [STREAMPIPES-563] Minor refactoring
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch STREAMPIPES-563
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-563 by this push:
new 4811a50f2 [STREAMPIPES-563] Minor refactoring
4811a50f2 is described below
commit 4811a50f2f5b799d9dd96bd504d7b2215e332faa
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Tue Aug 2 08:13:59 2022 +0200
[STREAMPIPES-563] Minor refactoring
---
.../dataexplorer/commons/DataExplorerUtils.java | 16 +++++-------
.../dataexplorer/commons/DataExplorerWriter.java | 8 +++---
.../CouchDbConfigurations.java | 2 +-
.../{couchdb => configs}/CouchDbEnvKeys.java | 2 +-
.../DataExplorerConfigurations.java | 16 ++++++------
.../{influx => configs}/DataExplorerEnvKeys.java | 2 +-
.../dataexplorer/commons/image/ImageStore.java | 2 +-
.../commons/influx/DataExplorerDefaults.java | 28 ---------------------
...Settings.java => InfluxConnectionSettings.java} | 29 +++++++---------------
.../dataexplorer/commons/influx/InfluxStore.java | 26 ++++++++-----------
.../sinks/internal/jvm/SinksInternalJvmInit.java | 4 +--
11 files changed, 44 insertions(+), 91 deletions(-)
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
index 8749b3cc0..12e53e2b0 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerUtils.java
@@ -30,13 +30,12 @@ public class DataExplorerUtils {
/**
* Sanitizes the event schema and stores the DataLakeMeasurement to the couchDB
*
- * @param client
- * @param measure
- * @throws SpRuntimeException
+ * @param client StreamPipes client to store measure
+ * @param measure DataLakeMeasurement
*/
public static DataLakeMeasure sanitizeAndRegisterAtDataLake(StreamPipesClient client,
DataLakeMeasure measure) throws SpRuntimeException {
- measure = sanitizeDataLakeMeasure(measure);
+ sanitizeDataLakeMeasure(measure);
registerAtDataLake(client, measure);
return measure;
@@ -50,10 +49,10 @@ public class DataExplorerUtils {
}
- private static DataLakeMeasure sanitizeDataLakeMeasure(DataLakeMeasure measure) throws SpRuntimeException {
+ private static void sanitizeDataLakeMeasure(DataLakeMeasure measure) throws SpRuntimeException {
// Removes selected timestamp from event schema
- measure = removeTimestampsFromEventSchema(measure);
+ removeTimestampsFromEventSchema(measure);
// Escapes all spaces with _
measure.setMeasureName(InfluxNameSanitizer.prepareString(measure.getMeasureName()));
@@ -63,17 +62,14 @@ public class DataExplorerUtils {
.getEventProperties()
.forEach(ep -> ep.setRuntimeName(InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
- return measure;
}
- private static DataLakeMeasure removeTimestampsFromEventSchema(DataLakeMeasure measure) {
+ private static void removeTimestampsFromEventSchema(DataLakeMeasure measure) {
List<EventProperty> eventPropertiesWithoutTimestamp = measure.getEventSchema().getEventProperties()
.stream()
.filter(eventProperty -> !measure.getTimestampField().endsWith(eventProperty.getRuntimeName()))
.collect(Collectors.toList());
measure.getEventSchema().setEventProperties(eventPropertiesWithoutTimestamp);
-
- return measure;
}
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
index 6a7d994d0..925a08b3f 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/DataExplorerWriter.java
@@ -18,8 +18,8 @@
package org.apache.streampipes.dataexplorer.commons;
-import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerConnectionSettings;
-import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerDefaults;
+import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
+import org.apache.streampipes.dataexplorer.commons.influx.InfluxConnectionSettings;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Point;
@@ -32,10 +32,10 @@ public class DataExplorerWriter {
private InfluxDB influxDB;
// TODO return a connection here
- public void connect(DataExplorerConnectionSettings dataExplorerConnectionSettings) {
+ public void connect(InfluxConnectionSettings dataExplorerConnectionSettings) {
this.influxDB = InfluxDBFactory.connect(dataExplorerConnectionSettings.getInfluxDbHost() + ":" + dataExplorerConnectionSettings.getInfluxDbPort(),
dataExplorerConnectionSettings.getUser(), dataExplorerConnectionSettings.getPassword());
- this.influxDB.setDatabase(DataExplorerDefaults.DATA_LAKE_DATABASE_NAME);
+ this.influxDB.setDatabase(DataExplorerConfigurations.DATA_LAKE_DATABASE_NAME);
}
public void close() {
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
similarity index 95%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
index adf9d8983..8079e9de1 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbConfigurations.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbConfigurations.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.commons.couchdb;
+package org.apache.streampipes.dataexplorer.commons.configs;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
similarity index 94%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
index 4dd6b4fb8..69a525764 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/couchdb/CouchDbEnvKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/CouchDbEnvKeys.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.dataexplorer.commons.couchdb;
+package org.apache.streampipes.dataexplorer.commons.configs;
public class CouchDbEnvKeys {
public final static String COUCHDB_HOST = "SP_COUCHDB_HOST";
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
similarity index 68%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
index 1baf5aa72..e0e483c71 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConfigurations.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerConfigurations.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.commons.influx;
+package org.apache.streampipes.dataexplorer.commons.configs;
import org.apache.streampipes.svcdiscovery.api.model.ConfigItem;
@@ -24,15 +24,17 @@ import java.util.List;
public class DataExplorerConfigurations {
+ public final static String DATA_LAKE_DATABASE_NAME = "sp";
public static List<ConfigItem> getDefaults() {
+
return Arrays.asList(
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, DataExplorerDefaults.DATA_LAKE_HOST, "Hostname for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, DataExplorerDefaults.DATA_LAKE_PROTOCOL, "Protocol for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, DataExplorerDefaults.DATA_LAKE_PORT, "Port for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, DataExplorerDefaults.DATA_LAKE_USERNAME, "Username for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, DataExplorerDefaults.DATA_LAKE_PASSWORD, "Password for the StreamPipes data lake database"),
- ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DataExplorerDefaults.DATA_LAKE_DATABASE_NAME, "Database name for the StreamPipes data lake database")
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_HOST, "influxdb", "Hostname for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL, "http", "Protocol for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PORT, 8086, "Port for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_USERNAME, "default", "Username for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_PASSWORD, "default", "Password for the StreamPipes data lake database"),
+ ConfigItem.from(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME, DATA_LAKE_DATABASE_NAME, "Database name for the StreamPipes data lake database")
);
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
similarity index 95%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
index 84ad4557e..cd4d17c33 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerEnvKeys.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/configs/DataExplorerEnvKeys.java
@@ -15,7 +15,7 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.dataexplorer.commons.influx;
+package org.apache.streampipes.dataexplorer.commons.configs;
public class DataExplorerEnvKeys {
public final static String DATA_LAKE_HOST = "SP_DATA_LAKE_HOST";
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
index 1a3e2356f..21c51c266 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/image/ImageStore.java
@@ -20,7 +20,7 @@ package org.apache.streampipes.dataexplorer.commons.image;
import org.apache.commons.codec.binary.Base64;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.dataexplorer.commons.couchdb.CouchDbEnvKeys;
+import org.apache.streampipes.dataexplorer.commons.configs.CouchDbEnvKeys;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.EventProperty;
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
deleted file mode 100644
index c31c02c38..000000000
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerDefaults.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.streampipes.dataexplorer.commons.influx;
-
-public class DataExplorerDefaults {
- public final static String DATA_LAKE_HOST = "influxdb";
- public final static String DATA_LAKE_PROTOCOL = "http";
- public final static int DATA_LAKE_PORT = 8086;
- public final static String DATA_LAKE_USERNAME = "default";
- public final static String DATA_LAKE_PASSWORD = "default";
- public final static String DATA_LAKE_DATABASE_NAME = "sp";
-
-}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java
similarity index 71%
rename from streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
rename to streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java
index 63b943dec..a6c153843 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/DataExplorerConnectionSettings.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxConnectionSettings.java
@@ -18,44 +18,37 @@
package org.apache.streampipes.dataexplorer.commons.influx;
+import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerEnvKeys;
import org.apache.streampipes.model.datalake.DataLakeMeasure;
import org.apache.streampipes.svcdiscovery.api.SpConfig;
-public class DataExplorerConnectionSettings {
+public class InfluxConnectionSettings {
private final Integer influxDbPort;
private final String influxDbHost;
private final String databaseName;
- private final String measureName;
private final String user;
private final String password;
- public static DataExplorerConnectionSettings from(SpConfig configStore) {
- return from(configStore, null);
- }
-
- public static DataExplorerConnectionSettings from(SpConfig configStore, DataLakeMeasure measure) {
+ public static InfluxConnectionSettings from(SpConfig configStore) {
- return new DataExplorerConnectionSettings(
+ return new InfluxConnectionSettings(
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PROTOCOL) + "://" + configStore.getString(DataExplorerEnvKeys.DATA_LAKE_HOST),
configStore.getInteger(DataExplorerEnvKeys.DATA_LAKE_PORT),
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_DATABASE_NAME),
- measure.getMeasureName(),
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_USERNAME),
configStore.getString(DataExplorerEnvKeys.DATA_LAKE_PASSWORD));
}
- private DataExplorerConnectionSettings(String influxDbHost,
- Integer influxDbPort,
- String databaseName,
- String measureName,
- String user,
- String password) {
+ private InfluxConnectionSettings(String influxDbHost,
+ Integer influxDbPort,
+ String databaseName,
+ String user,
+ String password) {
this.influxDbHost = influxDbHost;
this.influxDbPort = influxDbPort;
this.databaseName = databaseName;
- this.measureName = measureName;
this.user = user;
this.password = password;
}
@@ -72,10 +65,6 @@ public class DataExplorerConnectionSettings {
return databaseName;
}
- public String getMeasureName() {
- return measureName;
- }
-
public String getUser() {
return user;
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index 8b42d940a..8724b8a15 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -45,11 +45,10 @@ public class InfluxStore {
private static final Logger LOG = LoggerFactory.getLogger(InfluxStore.class);
- private final Integer batchSize = 2000;
- private final Integer flushDuration = 500;
+ private final static Integer batchSize = 2000;
+ private final static Integer flushDuration = 500;
private InfluxDB influxDb = null;
DataLakeMeasure measure;
-// private final DataExplorerConnectionSettings settings;
Map<String, String> targetRuntimeNames = new HashMap<>();
@@ -57,29 +56,24 @@ public class InfluxStore {
SpConfig configStore) throws SpRuntimeException {
this.measure = measure;
- DataExplorerConnectionSettings settings = DataExplorerConnectionSettings.from(configStore, measure);
+ InfluxConnectionSettings settings = InfluxConnectionSettings.from(configStore);
- // TODO check if this works
+ // store sanitized target property runtime names in local variable
measure.getEventSchema()
.getEventProperties()
- .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
+ .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(),
+ InfluxNameSanitizer.sanitizePropertyRuntimeName(ep.getRuntimeName())));
connect(settings);
}
- /**
- *
- * @param measure
- */
-
-
/**
* Connects to the InfluxDB Server, sets the database and initializes the batch-behaviour
*
* @throws SpRuntimeException If not connection can be established or if the database could not
* be found
*/
- private void connect(DataExplorerConnectionSettings settings) throws SpRuntimeException {
+ private void connect(InfluxConnectionSettings settings) throws SpRuntimeException {
// Connecting to the server
// "http://" must be in front
String urlAndPort = settings.getInfluxDbHost() + ":" + settings.getInfluxDbPort();
@@ -119,14 +113,14 @@ public class InfluxStore {
* @param dbName The name of the database which should be created
*/
private void createDatabase(String dbName) throws SpRuntimeException {
- if(!dbName.matches("^[a-zA-Z_][a-zA-Z0-9_]*$")) {
- throw new SpRuntimeException("Databasename '" + dbName + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
+ if(!dbName.matches("^[a-zA-Z_]\\w*$")) {
+ throw new SpRuntimeException("Database name '" + dbName + "' not allowed. Allowed names: ^[a-zA-Z_][a-zA-Z0-9_]*$");
}
influxDb.query(new Query("CREATE DATABASE \"" + dbName + "\"", ""));
}
/**
- * Saves an event to the connnected InfluxDB database
+ * Saves an event to the connected InfluxDB database
*
* @param event The event which should be saved
* @throws SpRuntimeException If the column name (key-value of the event map) is not allowed
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
index 42e0dc80a..421ab74a4 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/SinksInternalJvmInit.java
@@ -21,8 +21,8 @@ package org.apache.streampipes.sinks.internal.jvm;
import org.apache.streampipes.container.model.SpServiceDefinition;
import org.apache.streampipes.container.model.SpServiceDefinitionBuilder;
import org.apache.streampipes.container.standalone.init.StandaloneModelSubmitter;
-import org.apache.streampipes.dataexplorer.commons.couchdb.CouchDbConfigurations;
-import org.apache.streampipes.dataexplorer.commons.influx.DataExplorerConfigurations;
+import org.apache.streampipes.dataexplorer.commons.configs.CouchDbConfigurations;
+import org.apache.streampipes.dataexplorer.commons.configs.DataExplorerConfigurations;
import org.apache.streampipes.dataformat.cbor.CborDataFormatFactory;
import org.apache.streampipes.dataformat.fst.FstDataFormatFactory;
import org.apache.streampipes.dataformat.json.JsonDataFormatFactory;