You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 16:59:28 UTC

[incubator-streampipes] 03/03: [STREAMPIPES-502] Avoid reserved keywords in data explorer field names

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 44d8e15bef9c1a0583277406f677e639c6e12cd9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 17:59:10 2022 +0100

    [STREAMPIPES-502] Avoid reserved keywords in data explorer field names
---
 .../sinks/internal/jvm/datalake/DataLake.java      |  40 +++-----
 .../internal/jvm/datalake/DataLakeController.java  |   8 +-
 .../jvm/datalake/DataLakeInfluxDbClient.java       |  75 ++++++---------
 .../sinks/internal/jvm/datalake/DataLakeUtils.java |  39 ++++++++
 .../jvm/datalake/InfluxDbConnectionSettings.java   |  64 +++++++++++++
 .../jvm/datalake/InfluxDbReservedKeywords.java     | 102 +++++++++++++++++++++
 6 files changed, 249 insertions(+), 79 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index 07c146f..aa2ae39 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -67,19 +67,6 @@ public class DataLake implements EventSink<DataLakeParameters> {
     String user = configStore.getString(ConfigKeys.DATA_LAKE_USERNAME);
     String password = configStore.getString(ConfigKeys.DATA_LAKE_PASSWORD);
 
-    this.influxDbClient = new DataLakeInfluxDbClient(
-            influxHost,
-            influxPort,
-            databaseName,
-            parameters.getMeasurementName(),
-            user,
-            password,
-            parameters.getTimestampField(),
-            parameters.getBatchSize(),
-            parameters.getFlushDuration(),
-            LOG
-    );
-
     EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
     // Remove the timestamp field from the event schema
     List<EventProperty> eventPropertiesWithoutTimestamp = schema.getEventProperties()
@@ -91,11 +78,8 @@ public class DataLake implements EventSink<DataLakeParameters> {
     // deep copy of event schema. Event property runtime name is changed to lower case for the schema registration
     this.eventSchema = new EventSchema(schema);
 
-
-
-    schema.getEventProperties().stream().forEach(eventProperty -> {
-      eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
-    });
+    schema.getEventProperties().forEach(eventProperty ->
+            eventProperty.setRuntimeName(DataLakeUtils.sanitizePropertyRuntimeName(eventProperty.getRuntimeName())));
     registerAtDataLake(parameters.getMeasurementName(), schema, runtimeContext.getStreamPipesClient());
 
     imageProperties = schema.getEventProperties().stream()
@@ -106,13 +90,24 @@ public class DataLake implements EventSink<DataLakeParameters> {
 
     imageDirectory = configStore.getString(ConfigKeys.IMAGE_STORAGE_LOCATION) + parameters.getMeasurementName() + "/";
 
+    InfluxDbConnectionSettings settings = InfluxDbConnectionSettings.from(
+            influxHost, influxPort, databaseName, parameters.getMeasurementName(), user, password);
+
+    this.influxDbClient = new DataLakeInfluxDbClient(
+            settings,
+            parameters.getTimestampField(),
+            parameters.getBatchSize(),
+            parameters.getFlushDuration(),
+            this.eventSchema,
+            LOG
+    );
   }
 
   @Override
   public void onEvent(Event event) {
     try {
 
-      this.imageProperties.stream().forEach(eventProperty -> {
+      this.imageProperties.forEach(eventProperty -> {
         String eventTimestamp = Long.toString(event.getFieldBySelector(this.timestampField).getAsPrimitive().getAsLong());
         String fileRoute = this.imageDirectory + eventProperty.getRuntimeName() + "/" + eventTimestamp + ".png";
         String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
@@ -141,12 +136,9 @@ public class DataLake implements EventSink<DataLakeParameters> {
       file.getParentFile().mkdirs();
       OutputStream stream = new FileOutputStream(file, false);
       stream.write(data);
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
     } catch (IOException e) {
       e.printStackTrace();
     }
-
   }
 
   /**
@@ -163,7 +155,5 @@ public class DataLake implements EventSink<DataLakeParameters> {
         .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
   }
 
-  public static String prepareString(String s) {
-    return s.toLowerCase().replaceAll(" ", "_");
-  }
+
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index ffb2670..0ee7427 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -56,16 +56,10 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
   public ConfiguredEventSink<DataLakeParameters> onInvocation(DataSinkInvocation graph,
                                                               DataSinkParameterExtractor extractor) {
 
-
     String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
-    measureName = DataLake.prepareString(measureName);
+    measureName = DataLakeUtils.prepareString(measureName);
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
 
-//    String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
-//    Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
-//    String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
-//    String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
-//    String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
     Integer batch_size = 2000;
     Integer flush_duration = 500;
 
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index d31d7c5..4f98039 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -33,71 +33,51 @@ import org.influxdb.dto.Pong;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Code is the same as InfluxDB (org.apache.streampipes.sinks.databases.jvm.influxdb) sink. Changes applied here should also be applied in the InfluxDB sink
  */
 public class DataLakeInfluxDbClient {
-    private Integer influxDbPort;
-    private String influxDbHost;
-    private String databaseName;
-    private String measureName;
-    private String user;
-    private String password;
-    private String timestampField;
-    private Integer batchSize;
-    private Integer flushDuration;
-
-    private Logger logger;
+
+    private final String measureName;
+    private final String timestampField;
+    private final Integer batchSize;
+    private final Integer flushDuration;
+
+    private final Logger logger;
 
     private InfluxDB influxDb = null;
+    private final InfluxDbConnectionSettings settings;
+    private final EventSchema originalEventSchema;
+
+    Map<String, String> targetRuntimeNames = new HashMap<>();
 
-    DataLakeInfluxDbClient(String influxDbHost,
-                           Integer influxDbPort,
-                           String databaseName,
-                           String measureName,
-                           String user,
-                           String password,
+    DataLakeInfluxDbClient(InfluxDbConnectionSettings settings,
                            String timestampField,
                            Integer batchSize,
                            Integer flushDuration,
+                           EventSchema originalEventSchema,
                            Logger logger) throws SpRuntimeException {
-        this.influxDbHost = influxDbHost;
-        this.influxDbPort = influxDbPort;
-        this.databaseName = databaseName;
-        this.measureName = measureName;
-        this.user = user;
-        this.password = password;
+        this.settings = settings;
+        this.originalEventSchema = originalEventSchema;
         this.timestampField = timestampField;
         this.batchSize = batchSize;
         this.flushDuration = flushDuration;
         this.logger = logger;
+        this.measureName = settings.getMeasureName();
 
-        validate();
+        prepareSchema();
         connect();
     }
 
-    /**
-     * Checks whether the {@link DataLakeInfluxDbClient#influxDbHost} is valid
-     *
-     * @throws SpRuntimeException If the hostname is not valid
-     */
-    private void validate() throws SpRuntimeException {
-        //TODO: replace regex with validation method (import org.apache.commons.validator.routines.InetAddressValidator;)
-        // Validates the database name and the attributes
-        // See following link for regular expressions:
-        // https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address
-    /*String ipRegex = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|"
-        + "[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";
-    String hostnameRegex = "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*"
-        + "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$";*/
-        // https://stackoverflow.com/questions/3114595/java-regex-for-accepting-a-valid-hostname-ipv4-or-ipv6-address)
-        //if (!influxDbHost.matches(ipRegex) && !influxDbHost.matches(hostnameRegex)) {
-        //  throw new SpRuntimeException("Error: Hostname '" + influxDbHost
-        //      + "' not allowed");
-        //}
+    private void prepareSchema() {
+        originalEventSchema
+                .getEventProperties()
+                .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), DataLakeUtils.sanitizePropertyRuntimeName(ep.getRuntimeName())));
     }
 
     /**
@@ -109,8 +89,8 @@ public class DataLakeInfluxDbClient {
     private void connect() throws SpRuntimeException {
         // Connecting to the server
         // "http://" must be in front
-        String urlAndPort = influxDbHost + ":" + influxDbPort;
-        influxDb = InfluxDBFactory.connect(urlAndPort, user, password);
+        String urlAndPort = settings.getInfluxDbHost() + ":" + settings.getInfluxDbPort();
+        influxDb = InfluxDBFactory.connect(urlAndPort, settings.getUser(), settings.getPassword());
 
         // Checking, if server is available
         Pong response = influxDb.ping();
@@ -118,6 +98,7 @@ public class DataLakeInfluxDbClient {
             throw new SpRuntimeException("Could not connect to InfluxDb Server: " + urlAndPort);
         }
 
+        String databaseName = settings.getDatabaseName();
         // Checking whether the database exists
         if(!databaseExists(databaseName)) {
             logger.info("Database '" + databaseName + "' not found. Gets created ...");
@@ -177,10 +158,10 @@ public class DataLakeInfluxDbClient {
                 String runtimeName = ep.getRuntimeName();
 
                 if (!timestampField.endsWith(runtimeName)) {
-                    String preparedRuntimeName = DataLake.prepareString(runtimeName);
+                    String preparedRuntimeName = targetRuntimeNames.get(runtimeName);
                     PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
 
-                    // store property as tag when he field is a dimension property
+                    // store property as tag when the field is a dimension property
                     if ("DIMENSION_PROPERTY".equals(ep.getPropertyScope())) {
                         p.tag(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
                     } else {
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
new file mode 100644
index 0000000..8fa1736
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+public class DataLakeUtils {
+
+  public static String prepareString(String s) {
+    return s.toLowerCase().replaceAll(" ", "_");
+  }
+
+  private static String renameReservedKeywords(String runtimeName) {
+    if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
+      return runtimeName + "_";
+    } else {
+      return runtimeName;
+    }
+  }
+
+  public static String sanitizePropertyRuntimeName(String runtimeName) {
+    String sanitizedRuntimeName = prepareString(runtimeName);
+    return renameReservedKeywords(sanitizedRuntimeName);
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
new file mode 100644
index 0000000..e31848b
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
@@ -0,0 +1,64 @@
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+public class InfluxDbConnectionSettings {
+
+  private final Integer influxDbPort;
+  private final String influxDbHost;
+  private final String databaseName;
+  private final String measureName;
+  private final String user;
+  private final String password;
+
+  public static InfluxDbConnectionSettings from(String influxDbHost,
+                                                Integer influxDbPort,
+                                                String databaseName,
+                                                String measureName,
+                                                String user,
+                                                String password) {
+    return new InfluxDbConnectionSettings(
+            influxDbHost,
+            influxDbPort,
+            databaseName,
+            measureName,
+            user,
+            password);
+  }
+
+  private InfluxDbConnectionSettings(String influxDbHost,
+                                     Integer influxDbPort,
+                                     String databaseName,
+                                     String measureName,
+                                     String user,
+                                     String password) {
+    this.influxDbHost = influxDbHost;
+    this.influxDbPort = influxDbPort;
+    this.databaseName = databaseName;
+    this.measureName = measureName;
+    this.user = user;
+    this.password = password;
+  }
+
+  public Integer getInfluxDbPort() {
+    return influxDbPort;
+  }
+
+  public String getInfluxDbHost() {
+    return influxDbHost;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getMeasureName() {
+    return measureName;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
new file mode 100644
index 0000000..26e7a14
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class InfluxDbReservedKeywords {
+
+  public static final List<String> keywordList = Arrays.asList(
+          "ALL",
+          "ALTER",
+          "ANALYZE",
+          "ANY",
+          "AS",
+          "ASC",
+          "BEGIN",
+          "BY",
+          "CREATE",
+          "CONTINUOUS",
+          "DATABASE",
+          "DATABASES",
+          "DEFAULT",
+          "DELETE",
+          "DESC",
+          "DESTINATIONS",
+          "DIAGNOSTICS",
+          "DISTINCT",
+          "DROP",
+          "DURATION",
+          "END",
+          "EVERY",
+          "EXPLAIN",
+          "FIELD",
+          "FOR",
+          "FROM",
+          "GRANT",
+          "GRANTS",
+          "GROUP",
+          "GROUPS",
+          "IN",
+          "INF",
+          "INSERT",
+          "INTO",
+          "KEY",
+          "KEYS",
+          "KILL",
+          "LIMIT",
+          "SHOW",
+          "MEASUREMENT",
+          "MEASUREMENTS",
+          "NAME",
+          "OFFSET",
+          "ON",
+          "ORDER",
+          "PASSWORD",
+          "POLICY",
+          "POLICIES",
+          "PRIVILEGES",
+          "QUERIES",
+          "QUERY",
+          "READ",
+          "REPLICATION",
+          "RESAMPLE",
+          "RETENTION",
+          "REVOKE",
+          "SELECT",
+          "SERIES",
+          "SET",
+          "SHARD",
+          "SHARDS",
+          "SLIMIT",
+          "SOFFSET",
+          "STATS",
+          "SUBSCRIPTION",
+          "SUBSCRIPTIONS",
+          "TAG",
+          "TO",
+          "USER",
+          "USERS",
+          "VALUES",
+          "WHERE",
+          "WITH",
+          "WRITE"
+  );
+}