You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 16:59:28 UTC
[incubator-streampipes] 03/03: [STREAMPIPES-502] Avoid reserved keywords in data explorer field names
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit 44d8e15bef9c1a0583277406f677e639c6e12cd9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 17:59:10 2022 +0100
[STREAMPIPES-502] Avoid reserved keywords in data explorer field names
---
.../sinks/internal/jvm/datalake/DataLake.java | 40 +++-----
.../internal/jvm/datalake/DataLakeController.java | 8 +-
.../jvm/datalake/DataLakeInfluxDbClient.java | 75 ++++++---------
.../sinks/internal/jvm/datalake/DataLakeUtils.java | 39 ++++++++
.../jvm/datalake/InfluxDbConnectionSettings.java | 64 +++++++++++++
.../jvm/datalake/InfluxDbReservedKeywords.java | 102 +++++++++++++++++++++
6 files changed, 249 insertions(+), 79 deletions(-)
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index 07c146f..aa2ae39 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -67,19 +67,6 @@ public class DataLake implements EventSink<DataLakeParameters> {
String user = configStore.getString(ConfigKeys.DATA_LAKE_USERNAME);
String password = configStore.getString(ConfigKeys.DATA_LAKE_PASSWORD);
- this.influxDbClient = new DataLakeInfluxDbClient(
- influxHost,
- influxPort,
- databaseName,
- parameters.getMeasurementName(),
- user,
- password,
- parameters.getTimestampField(),
- parameters.getBatchSize(),
- parameters.getFlushDuration(),
- LOG
- );
-
EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
// Remove the timestamp field from the event schema
List<EventProperty> eventPropertiesWithoutTimestamp = schema.getEventProperties()
@@ -91,11 +78,8 @@ public class DataLake implements EventSink<DataLakeParameters> {
// deep copy of event schema. Event property runtime name is changed to lower case for the schema registration
this.eventSchema = new EventSchema(schema);
-
-
- schema.getEventProperties().stream().forEach(eventProperty -> {
- eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
- });
+ schema.getEventProperties().forEach(eventProperty ->
+ eventProperty.setRuntimeName(DataLakeUtils.sanitizePropertyRuntimeName(eventProperty.getRuntimeName())));
registerAtDataLake(parameters.getMeasurementName(), schema, runtimeContext.getStreamPipesClient());
imageProperties = schema.getEventProperties().stream()
@@ -106,13 +90,24 @@ public class DataLake implements EventSink<DataLakeParameters> {
imageDirectory = configStore.getString(ConfigKeys.IMAGE_STORAGE_LOCATION) + parameters.getMeasurementName() + "/";
+ InfluxDbConnectionSettings settings = InfluxDbConnectionSettings.from(
+ influxHost, influxPort, databaseName, parameters.getMeasurementName(), user, password);
+
+ this.influxDbClient = new DataLakeInfluxDbClient(
+ settings,
+ parameters.getTimestampField(),
+ parameters.getBatchSize(),
+ parameters.getFlushDuration(),
+ this.eventSchema,
+ LOG
+ );
}
@Override
public void onEvent(Event event) {
try {
- this.imageProperties.stream().forEach(eventProperty -> {
+ this.imageProperties.forEach(eventProperty -> {
String eventTimestamp = Long.toString(event.getFieldBySelector(this.timestampField).getAsPrimitive().getAsLong());
String fileRoute = this.imageDirectory + eventProperty.getRuntimeName() + "/" + eventTimestamp + ".png";
String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
@@ -141,12 +136,9 @@ public class DataLake implements EventSink<DataLakeParameters> {
file.getParentFile().mkdirs();
OutputStream stream = new FileOutputStream(file, false);
stream.write(data);
- } catch (FileNotFoundException e) {
- e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
-
}
/**
@@ -163,7 +155,5 @@ public class DataLake implements EventSink<DataLakeParameters> {
.sendPost("api/v3/datalake/measure/" + measure, eventSchema);
}
- public static String prepareString(String s) {
- return s.toLowerCase().replaceAll(" ", "_");
- }
+
}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index ffb2670..0ee7427 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -56,16 +56,10 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
public ConfiguredEventSink<DataLakeParameters> onInvocation(DataSinkInvocation graph,
DataSinkParameterExtractor extractor) {
-
String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
- measureName = DataLake.prepareString(measureName);
+ measureName = DataLakeUtils.prepareString(measureName);
String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
-// String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
-// Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
-// String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
-// String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
-// String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
Integer batch_size = 2000;
Integer flush_duration = 500;
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index d31d7c5..4f98039 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -33,71 +33,51 @@ import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Code is the same as InfluxDB (org.apache.streampipes.sinks.databases.jvm.influxdb) sink. Changes applied here should also be applied in the InfluxDB sink
*/
public class DataLakeInfluxDbClient {
- private Integer influxDbPort;
- private String influxDbHost;
- private String databaseName;
- private String measureName;
- private String user;
- private String password;
- private String timestampField;
- private Integer batchSize;
- private Integer flushDuration;
-
- private Logger logger;
+
+ private final String measureName;
+ private final String timestampField;
+ private final Integer batchSize;
+ private final Integer flushDuration;
+
+ private final Logger logger;
private InfluxDB influxDb = null;
+ private final InfluxDbConnectionSettings settings;
+ private final EventSchema originalEventSchema;
+
+ Map<String, String> targetRuntimeNames = new HashMap<>();
- DataLakeInfluxDbClient(String influxDbHost,
- Integer influxDbPort,
- String databaseName,
- String measureName,
- String user,
- String password,
+ DataLakeInfluxDbClient(InfluxDbConnectionSettings settings,
String timestampField,
Integer batchSize,
Integer flushDuration,
+ EventSchema originalEventSchema,
Logger logger) throws SpRuntimeException {
- this.influxDbHost = influxDbHost;
- this.influxDbPort = influxDbPort;
- this.databaseName = databaseName;
- this.measureName = measureName;
- this.user = user;
- this.password = password;
+ this.settings = settings;
+ this.originalEventSchema = originalEventSchema;
this.timestampField = timestampField;
this.batchSize = batchSize;
this.flushDuration = flushDuration;
this.logger = logger;
+ this.measureName = settings.getMeasureName();
- validate();
+ prepareSchema();
connect();
}
- /**
- * Checks whether the {@link DataLakeInfluxDbClient#influxDbHost} is valid
- *
- * @throws SpRuntimeException If the hostname is not valid
- */
- private void validate() throws SpRuntimeException {
- //TODO: replace regex with validation method (import org.apache.commons.validator.routines.InetAddressValidator;)
- // Validates the database name and the attributes
- // See following link for regular expressions:
- // https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address
- /*String ipRegex = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|"
- + "[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";
- String hostnameRegex = "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*"
- + "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$";*/
- // https://stackoverflow.com/questions/3114595/java-regex-for-accepting-a-valid-hostname-ipv4-or-ipv6-address)
- //if (!influxDbHost.matches(ipRegex) && !influxDbHost.matches(hostnameRegex)) {
- // throw new SpRuntimeException("Error: Hostname '" + influxDbHost
- // + "' not allowed");
- //}
+ private void prepareSchema() {
+ originalEventSchema
+ .getEventProperties()
+ .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), DataLakeUtils.sanitizePropertyRuntimeName(ep.getRuntimeName())));
}
/**
@@ -109,8 +89,8 @@ public class DataLakeInfluxDbClient {
private void connect() throws SpRuntimeException {
// Connecting to the server
// "http://" must be in front
- String urlAndPort = influxDbHost + ":" + influxDbPort;
- influxDb = InfluxDBFactory.connect(urlAndPort, user, password);
+ String urlAndPort = settings.getInfluxDbHost() + ":" + settings.getInfluxDbPort();
+ influxDb = InfluxDBFactory.connect(urlAndPort, settings.getUser(), settings.getPassword());
// Checking, if server is available
Pong response = influxDb.ping();
@@ -118,6 +98,7 @@ public class DataLakeInfluxDbClient {
throw new SpRuntimeException("Could not connect to InfluxDb Server: " + urlAndPort);
}
+ String databaseName = settings.getDatabaseName();
// Checking whether the database exists
if(!databaseExists(databaseName)) {
logger.info("Database '" + databaseName + "' not found. Gets created ...");
@@ -177,10 +158,10 @@ public class DataLakeInfluxDbClient {
String runtimeName = ep.getRuntimeName();
if (!timestampField.endsWith(runtimeName)) {
- String preparedRuntimeName = DataLake.prepareString(runtimeName);
+ String preparedRuntimeName = targetRuntimeNames.get(runtimeName);
PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
- // store property as tag when he field is a dimension property
+ // store property as tag when the field is a dimension property
if ("DIMENSION_PROPERTY".equals(ep.getPropertyScope())) {
p.tag(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
} else {
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
new file mode 100644
index 0000000..8fa1736
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+public class DataLakeUtils {
+
+ public static String prepareString(String s) {
+ return s.toLowerCase().replaceAll(" ", "_");
+ }
+
+ private static String renameReservedKeywords(String runtimeName) {
+ if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
+ return runtimeName + "_";
+ } else {
+ return runtimeName;
+ }
+ }
+
+ public static String sanitizePropertyRuntimeName(String runtimeName) {
+ String sanitizedRuntimeName = prepareString(runtimeName);
+ return renameReservedKeywords(sanitizedRuntimeName);
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
new file mode 100644
index 0000000..e31848b
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
@@ -0,0 +1,64 @@
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+public class InfluxDbConnectionSettings {
+
+ private final Integer influxDbPort;
+ private final String influxDbHost;
+ private final String databaseName;
+ private final String measureName;
+ private final String user;
+ private final String password;
+
+ public static InfluxDbConnectionSettings from(String influxDbHost,
+ Integer influxDbPort,
+ String databaseName,
+ String measureName,
+ String user,
+ String password) {
+ return new InfluxDbConnectionSettings(
+ influxDbHost,
+ influxDbPort,
+ databaseName,
+ measureName,
+ user,
+ password);
+ }
+
+ private InfluxDbConnectionSettings(String influxDbHost,
+ Integer influxDbPort,
+ String databaseName,
+ String measureName,
+ String user,
+ String password) {
+ this.influxDbHost = influxDbHost;
+ this.influxDbPort = influxDbPort;
+ this.databaseName = databaseName;
+ this.measureName = measureName;
+ this.user = user;
+ this.password = password;
+ }
+
+ public Integer getInfluxDbPort() {
+ return influxDbPort;
+ }
+
+ public String getInfluxDbHost() {
+ return influxDbHost;
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getMeasureName() {
+ return measureName;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
new file mode 100644
index 0000000..26e7a14
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class InfluxDbReservedKeywords {
+
+ public static final List<String> keywordList = Arrays.asList(
+ "ALL",
+ "ALTER",
+ "ANALYZE",
+ "ANY",
+ "AS",
+ "ASC",
+ "BEGIN",
+ "BY",
+ "CREATE",
+ "CONTINUOUS",
+ "DATABASE",
+ "DATABASES",
+ "DEFAULT",
+ "DELETE",
+ "DESC",
+ "DESTINATIONS",
+ "DIAGNOSTICS",
+ "DISTINCT",
+ "DROP",
+ "DURATION",
+ "END",
+ "EVERY",
+ "EXPLAIN",
+ "FIELD",
+ "FOR",
+ "FROM",
+ "GRANT",
+ "GRANTS",
+ "GROUP",
+ "GROUPS",
+ "IN",
+ "INF",
+ "INSERT",
+ "INTO",
+ "KEY",
+ "KEYS",
+ "KILL",
+ "LIMIT",
+ "SHOW",
+ "MEASUREMENT",
+ "MEASUREMENTS",
+ "NAME",
+ "OFFSET",
+ "ON",
+ "ORDER",
+ "PASSWORD",
+ "POLICY",
+ "POLICIES",
+ "PRIVILEGES",
+ "QUERIES",
+ "QUERY",
+ "READ",
+ "REPLICATION",
+ "RESAMPLE",
+ "RETENTION",
+ "REVOKE",
+ "SELECT",
+ "SERIES",
+ "SET",
+ "SHARD",
+ "SHARDS",
+ "SLIMIT",
+ "SOFFSET",
+ "STATS",
+ "SUBSCRIPTION",
+ "SUBSCRIPTIONS",
+ "TAG",
+ "TO",
+ "USER",
+ "USERS",
+ "VALUES",
+ "WHERE",
+ "WITH",
+ "WRITE"
+ );
+}