You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/06 16:59:25 UTC

[incubator-streampipes] branch dev updated (bbe3416 -> 44d8e15)

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a change to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git.


    from bbe3416  [hotfix] Improve icon handling of pipeline elements
     new 1083387  [hotfix] Fix progress bar size in pipeline delete dialog
     new a960103  [hotfix] Do not trigger field selection before component is rendered
     new 44d8e15  [STREAMPIPES-502] Avoid reserved keywords in data explorer field names

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../sinks/internal/jvm/datalake/DataLake.java      |  40 +++-----
 .../internal/jvm/datalake/DataLakeController.java  |   8 +-
 .../jvm/datalake/DataLakeInfluxDbClient.java       |  75 ++++++---------
 .../internal/jvm/datalake/DataLakeUtils.java}      |  24 +++--
 .../jvm/datalake/InfluxDbConnectionSettings.java   |  64 +++++++++++++
 .../jvm/datalake/InfluxDbReservedKeywords.java     | 102 +++++++++++++++++++++
 ...data-explorer-widget-data-settings.component.ts |  10 +-
 .../time-series-chart-widget-config.component.ts   |   1 -
 .../delete-pipeline-dialog.component.html          |   2 +-
 9 files changed, 231 insertions(+), 95 deletions(-)
 copy streampipes-extensions/{streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/count/TimeWindowConverter.java => streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java} (57%)
 create mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
 create mode 100644 streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java

[incubator-streampipes] 02/03: [hotfix] Do not trigger field selection before component is rendered

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit a9601039e9b5cf78ba8b5edffb65c35253edd70a
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 16:56:11 2022 +0100

    [hotfix] Do not trigger field selection before component is rendered
---
 .../data-explorer-widget-data-settings.component.ts            | 10 ++++++----
 .../config/time-series-chart-widget-config.component.ts        |  1 -
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/ui/src/app/data-explorer/components/designer-panel/data-settings/data-explorer-widget-data-settings.component.ts b/ui/src/app/data-explorer/components/designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
index dfae898..b959045 100644
--- a/ui/src/app/data-explorer/components/designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
+++ b/ui/src/app/data-explorer/components/designer-panel/data-settings/data-explorer-widget-data-settings.component.ts
@@ -16,7 +16,7 @@
  *
  */
 
-import {Component, ElementRef, EventEmitter, Input, OnInit, Output, ViewChild} from '@angular/core';
+import { Component, EventEmitter, Input, OnInit, Output, ViewChild } from '@angular/core';
 import { DataExplorerWidgetModel, DataLakeMeasure } from '../../../../core-model/gen/streampipes-model';
 import { DataViewDataExplorerService } from '../../../../platform-services/apis/data-view-data-explorer.service';
 import { MatSelectChange } from '@angular/material/select';
@@ -25,7 +25,7 @@ import { DatalakeRestService } from '../../../../platform-services/apis/datalake
 import { zip } from 'rxjs';
 import { DataExplorerDataConfig, SourceConfig } from '../../../models/dataview-dashboard.model';
 import { WidgetConfigurationService } from '../../../services/widget-configuration.service';
-import {FieldSelectionPanelComponent} from "./field-selection-panel/field-selection-panel.component";
+import { FieldSelectionPanelComponent } from './field-selection-panel/field-selection-panel.component';
 
 @Component({
   selector: 'sp-data-explorer-widget-data-settings',
@@ -44,7 +44,7 @@ export class DataExplorerWidgetDataSettingsComponent implements OnInit {
   @Output() dataLakeMeasureChange: EventEmitter<DataLakeMeasure> = new EventEmitter<DataLakeMeasure>();
   @Output() configureVisualizationEmitter: EventEmitter<void> = new EventEmitter<void>();
 
-  @ViewChild("fieldSelectionPanel")
+  @ViewChild('fieldSelectionPanel')
   fieldSelectionPanel: FieldSelectionPanelComponent;
 
   availablePipelines: DataLakeMeasure[];
@@ -84,7 +84,9 @@ export class DataExplorerWidgetDataSettingsComponent implements OnInit {
   updateMeasure(sourceConfig: SourceConfig, event: MatSelectChange) {
     sourceConfig.measure = this.findMeasure(event.value);
     sourceConfig.queryConfig.fields = [];
-    this.fieldSelectionPanel.applyDefaultFields();
+    if (this.fieldSelectionPanel) {
+      this.fieldSelectionPanel.applyDefaultFields();
+    }
 
   }
 
diff --git a/ui/src/app/data-explorer/components/widgets/time-series-chart/config/time-series-chart-widget-config.component.ts b/ui/src/app/data-explorer/components/widgets/time-series-chart/config/time-series-chart-widget-config.component.ts
index 9dfb812..fcb4834 100644
--- a/ui/src/app/data-explorer/components/widgets/time-series-chart/config/time-series-chart-widget-config.component.ts
+++ b/ui/src/app/data-explorer/components/widgets/time-series-chart/config/time-series-chart-widget-config.component.ts
@@ -46,7 +46,6 @@ export class TimeSeriesChartWidgetConfigComponent
 
   setSelectedProperties(selectedColumns: DataExplorerField[]) {
     this.currentlyConfiguredWidget.visualizationConfig.selectedTimeSeriesChartProperties = selectedColumns;
-    console.log(selectedColumns);
     // this.currentlyConfiguredWidget.dataConfig.yKeys = this.getRuntimeNames(selectedColumns);
     this.triggerDataRefresh();
   }

[incubator-streampipes] 01/03: [hotfix] Fix progress bar size in pipeline delete dialog

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 10833872d47dd105ac0ffac0cafbcf86c739652e
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 16:42:41 2022 +0100

    [hotfix] Fix progress bar size in pipeline delete dialog
---
 .../dialog/delete-pipeline/delete-pipeline-dialog.component.html        | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ui/src/app/pipelines/dialog/delete-pipeline/delete-pipeline-dialog.component.html b/ui/src/app/pipelines/dialog/delete-pipeline/delete-pipeline-dialog.component.html
index 85980e6..24f2721 100644
--- a/ui/src/app/pipelines/dialog/delete-pipeline/delete-pipeline-dialog.component.html
+++ b/ui/src/app/pipelines/dialog/delete-pipeline/delete-pipeline-dialog.component.html
@@ -40,7 +40,7 @@
         </div>
         <div fxFlex="100" fxLayoutAlign="center center" fxLayout="column" *ngIf="isInProgress">
             <div fxLayout="row" fxLayoutAlign="space-around">
-                <mat-spinner [mode]="'indeterminate'" color="accent"></mat-spinner>
+                <mat-spinner [diameter]="50" [mode]="'indeterminate'" color="accent"></mat-spinner>
             </div>
             <b><h4>{{currentStatus}}</h4></b>
         </div>

[incubator-streampipes] 03/03: [STREAMPIPES-502] Avoid reserved keywords in data explorer field names

Posted by ri...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit 44d8e15bef9c1a0583277406f677e639c6e12cd9
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Jan 6 17:59:10 2022 +0100

    [STREAMPIPES-502] Avoid reserved keywords in data explorer field names
---
 .../sinks/internal/jvm/datalake/DataLake.java      |  40 +++-----
 .../internal/jvm/datalake/DataLakeController.java  |   8 +-
 .../jvm/datalake/DataLakeInfluxDbClient.java       |  75 ++++++---------
 .../sinks/internal/jvm/datalake/DataLakeUtils.java |  39 ++++++++
 .../jvm/datalake/InfluxDbConnectionSettings.java   |  64 +++++++++++++
 .../jvm/datalake/InfluxDbReservedKeywords.java     | 102 +++++++++++++++++++++
 6 files changed, 249 insertions(+), 79 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index 07c146f..aa2ae39 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -67,19 +67,6 @@ public class DataLake implements EventSink<DataLakeParameters> {
     String user = configStore.getString(ConfigKeys.DATA_LAKE_USERNAME);
     String password = configStore.getString(ConfigKeys.DATA_LAKE_PASSWORD);
 
-    this.influxDbClient = new DataLakeInfluxDbClient(
-            influxHost,
-            influxPort,
-            databaseName,
-            parameters.getMeasurementName(),
-            user,
-            password,
-            parameters.getTimestampField(),
-            parameters.getBatchSize(),
-            parameters.getFlushDuration(),
-            LOG
-    );
-
     EventSchema schema = runtimeContext.getInputSchemaInfo().get(0).getEventSchema();
     // Remove the timestamp field from the event schema
     List<EventProperty> eventPropertiesWithoutTimestamp = schema.getEventProperties()
@@ -91,11 +78,8 @@ public class DataLake implements EventSink<DataLakeParameters> {
     // deep copy of event schema. Event property runtime name is changed to lower case for the schema registration
     this.eventSchema = new EventSchema(schema);
 
-
-
-    schema.getEventProperties().stream().forEach(eventProperty -> {
-      eventProperty.setRuntimeName(prepareString(eventProperty.getRuntimeName()));
-    });
+    schema.getEventProperties().forEach(eventProperty ->
+            eventProperty.setRuntimeName(DataLakeUtils.sanitizePropertyRuntimeName(eventProperty.getRuntimeName())));
     registerAtDataLake(parameters.getMeasurementName(), schema, runtimeContext.getStreamPipesClient());
 
     imageProperties = schema.getEventProperties().stream()
@@ -106,13 +90,24 @@ public class DataLake implements EventSink<DataLakeParameters> {
 
     imageDirectory = configStore.getString(ConfigKeys.IMAGE_STORAGE_LOCATION) + parameters.getMeasurementName() + "/";
 
+    InfluxDbConnectionSettings settings = InfluxDbConnectionSettings.from(
+            influxHost, influxPort, databaseName, parameters.getMeasurementName(), user, password);
+
+    this.influxDbClient = new DataLakeInfluxDbClient(
+            settings,
+            parameters.getTimestampField(),
+            parameters.getBatchSize(),
+            parameters.getFlushDuration(),
+            this.eventSchema,
+            LOG
+    );
   }
 
   @Override
   public void onEvent(Event event) {
     try {
 
-      this.imageProperties.stream().forEach(eventProperty -> {
+      this.imageProperties.forEach(eventProperty -> {
         String eventTimestamp = Long.toString(event.getFieldBySelector(this.timestampField).getAsPrimitive().getAsLong());
         String fileRoute = this.imageDirectory + eventProperty.getRuntimeName() + "/" + eventTimestamp + ".png";
         String image = event.getFieldByRuntimeName(eventProperty.getRuntimeName()).getAsPrimitive().getAsString();
@@ -141,12 +136,9 @@ public class DataLake implements EventSink<DataLakeParameters> {
       file.getParentFile().mkdirs();
       OutputStream stream = new FileOutputStream(file, false);
       stream.write(data);
-    } catch (FileNotFoundException e) {
-      e.printStackTrace();
     } catch (IOException e) {
       e.printStackTrace();
     }
-
   }
 
   /**
@@ -163,7 +155,5 @@ public class DataLake implements EventSink<DataLakeParameters> {
         .sendPost("api/v3/datalake/measure/" + measure, eventSchema);
   }
 
-  public static String prepareString(String s) {
-    return s.toLowerCase().replaceAll(" ", "_");
-  }
+
 }
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
index ffb2670..0ee7427 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeController.java
@@ -56,16 +56,10 @@ public class DataLakeController extends StandaloneEventSinkDeclarer<DataLakePara
   public ConfiguredEventSink<DataLakeParameters> onInvocation(DataSinkInvocation graph,
                                                               DataSinkParameterExtractor extractor) {
 
-
     String measureName = extractor.singleValueParameter(DATABASE_MEASUREMENT_KEY, String.class);
-    measureName = DataLake.prepareString(measureName);
+    measureName = DataLakeUtils.prepareString(measureName);
     String timestampField = extractor.mappingPropertyValue(TIMESTAMP_MAPPING_KEY);
 
-//    String hostname = SinksInternalJvmConfig.INSTANCE.getDataLakeProtocol() + "://" + SinksInternalJvmConfig.INSTANCE.getDataLakeHost();
-//    Integer port = SinksInternalJvmConfig.INSTANCE.getDataLakePort();
-//    String dbName = SinksInternalJvmConfig.INSTANCE.getDataLakeDatabaseName();
-//    String user = SinksInternalJvmConfig.INSTANCE.getDataLakeUsername();
-//    String password = SinksInternalJvmConfig.INSTANCE.getDataLakePassword();
     Integer batch_size = 2000;
     Integer flush_duration = 500;
 
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index d31d7c5..4f98039 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -33,71 +33,51 @@ import org.influxdb.dto.Pong;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 /**
  * Code is the same as InfluxDB (org.apache.streampipes.sinks.databases.jvm.influxdb) sink. Changes applied here should also be applied in the InfluxDB sink
  */
 public class DataLakeInfluxDbClient {
-    private Integer influxDbPort;
-    private String influxDbHost;
-    private String databaseName;
-    private String measureName;
-    private String user;
-    private String password;
-    private String timestampField;
-    private Integer batchSize;
-    private Integer flushDuration;
-
-    private Logger logger;
+
+    private final String measureName;
+    private final String timestampField;
+    private final Integer batchSize;
+    private final Integer flushDuration;
+
+    private final Logger logger;
 
     private InfluxDB influxDb = null;
+    private final InfluxDbConnectionSettings settings;
+    private final EventSchema originalEventSchema;
+
+    Map<String, String> targetRuntimeNames = new HashMap<>();
 
-    DataLakeInfluxDbClient(String influxDbHost,
-                           Integer influxDbPort,
-                           String databaseName,
-                           String measureName,
-                           String user,
-                           String password,
+    DataLakeInfluxDbClient(InfluxDbConnectionSettings settings,
                            String timestampField,
                            Integer batchSize,
                            Integer flushDuration,
+                           EventSchema originalEventSchema,
                            Logger logger) throws SpRuntimeException {
-        this.influxDbHost = influxDbHost;
-        this.influxDbPort = influxDbPort;
-        this.databaseName = databaseName;
-        this.measureName = measureName;
-        this.user = user;
-        this.password = password;
+        this.settings = settings;
+        this.originalEventSchema = originalEventSchema;
         this.timestampField = timestampField;
         this.batchSize = batchSize;
         this.flushDuration = flushDuration;
         this.logger = logger;
+        this.measureName = settings.getMeasureName();
 
-        validate();
+        prepareSchema();
         connect();
     }
 
-    /**
-     * Checks whether the {@link DataLakeInfluxDbClient#influxDbHost} is valid
-     *
-     * @throws SpRuntimeException If the hostname is not valid
-     */
-    private void validate() throws SpRuntimeException {
-        //TODO: replace regex with validation method (import org.apache.commons.validator.routines.InetAddressValidator;)
-        // Validates the database name and the attributes
-        // See following link for regular expressions:
-        // https://stackoverflow.com/questions/106179/regular-expression-to-match-dns-hostname-or-ip-address
-    /*String ipRegex = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|"
-        + "[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";
-    String hostnameRegex = "^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\\-]*[a-zA-Z0-9])\\.)*"
-        + "([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\\-]*[A-Za-z0-9])$";*/
-        // https://stackoverflow.com/questions/3114595/java-regex-for-accepting-a-valid-hostname-ipv4-or-ipv6-address)
-        //if (!influxDbHost.matches(ipRegex) && !influxDbHost.matches(hostnameRegex)) {
-        //  throw new SpRuntimeException("Error: Hostname '" + influxDbHost
-        //      + "' not allowed");
-        //}
+    private void prepareSchema() {
+        originalEventSchema
+                .getEventProperties()
+                .forEach(ep -> targetRuntimeNames.put(ep.getRuntimeName(), DataLakeUtils.sanitizePropertyRuntimeName(ep.getRuntimeName())));
     }
 
     /**
@@ -109,8 +89,8 @@ public class DataLakeInfluxDbClient {
     private void connect() throws SpRuntimeException {
         // Connecting to the server
         // "http://" must be in front
-        String urlAndPort = influxDbHost + ":" + influxDbPort;
-        influxDb = InfluxDBFactory.connect(urlAndPort, user, password);
+        String urlAndPort = settings.getInfluxDbHost() + ":" + settings.getInfluxDbPort();
+        influxDb = InfluxDBFactory.connect(urlAndPort, settings.getUser(), settings.getPassword());
 
         // Checking, if server is available
         Pong response = influxDb.ping();
@@ -118,6 +98,7 @@ public class DataLakeInfluxDbClient {
             throw new SpRuntimeException("Could not connect to InfluxDb Server: " + urlAndPort);
         }
 
+        String databaseName = settings.getDatabaseName();
         // Checking whether the database exists
         if(!databaseExists(databaseName)) {
             logger.info("Database '" + databaseName + "' not found. Gets created ...");
@@ -177,10 +158,10 @@ public class DataLakeInfluxDbClient {
                 String runtimeName = ep.getRuntimeName();
 
                 if (!timestampField.endsWith(runtimeName)) {
-                    String preparedRuntimeName = DataLake.prepareString(runtimeName);
+                    String preparedRuntimeName = targetRuntimeNames.get(runtimeName);
                     PrimitiveField eventPropertyPrimitiveField = event.getFieldByRuntimeName(runtimeName).getAsPrimitive();
 
-                    // store property as tag when he field is a dimension property
+                    // store property as tag when the field is a dimension property
                     if ("DIMENSION_PROPERTY".equals(ep.getPropertyScope())) {
                         p.tag(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
                     } else {
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
new file mode 100644
index 0000000..8fa1736
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+public class DataLakeUtils {
+
+  public static String prepareString(String s) {
+    return s.toLowerCase().replaceAll(" ", "_");
+  }
+
+  private static String renameReservedKeywords(String runtimeName) {
+    if (InfluxDbReservedKeywords.keywordList.stream().anyMatch(k -> k.equalsIgnoreCase(runtimeName))) {
+      return runtimeName + "_";
+    } else {
+      return runtimeName;
+    }
+  }
+
+  public static String sanitizePropertyRuntimeName(String runtimeName) {
+    String sanitizedRuntimeName = prepareString(runtimeName);
+    return renameReservedKeywords(sanitizedRuntimeName);
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
new file mode 100644
index 0000000..e31848b
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbConnectionSettings.java
@@ -0,0 +1,64 @@
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+public class InfluxDbConnectionSettings {
+
+  private final Integer influxDbPort;
+  private final String influxDbHost;
+  private final String databaseName;
+  private final String measureName;
+  private final String user;
+  private final String password;
+
+  public static InfluxDbConnectionSettings from(String influxDbHost,
+                                                Integer influxDbPort,
+                                                String databaseName,
+                                                String measureName,
+                                                String user,
+                                                String password) {
+    return new InfluxDbConnectionSettings(
+            influxDbHost,
+            influxDbPort,
+            databaseName,
+            measureName,
+            user,
+            password);
+  }
+
+  private InfluxDbConnectionSettings(String influxDbHost,
+                                     Integer influxDbPort,
+                                     String databaseName,
+                                     String measureName,
+                                     String user,
+                                     String password) {
+    this.influxDbHost = influxDbHost;
+    this.influxDbPort = influxDbPort;
+    this.databaseName = databaseName;
+    this.measureName = measureName;
+    this.user = user;
+    this.password = password;
+  }
+
+  public Integer getInfluxDbPort() {
+    return influxDbPort;
+  }
+
+  public String getInfluxDbHost() {
+    return influxDbHost;
+  }
+
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getMeasureName() {
+    return measureName;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
new file mode 100644
index 0000000..26e7a14
--- /dev/null
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/InfluxDbReservedKeywords.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.sinks.internal.jvm.datalake;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class InfluxDbReservedKeywords {
+
+  public static final List<String> keywordList = Arrays.asList(
+          "ALL",
+          "ALTER",
+          "ANALYZE",
+          "ANY",
+          "AS",
+          "ASC",
+          "BEGIN",
+          "BY",
+          "CREATE",
+          "CONTINUOUS",
+          "DATABASE",
+          "DATABASES",
+          "DEFAULT",
+          "DELETE",
+          "DESC",
+          "DESTINATIONS",
+          "DIAGNOSTICS",
+          "DISTINCT",
+          "DROP",
+          "DURATION",
+          "END",
+          "EVERY",
+          "EXPLAIN",
+          "FIELD",
+          "FOR",
+          "FROM",
+          "GRANT",
+          "GRANTS",
+          "GROUP",
+          "GROUPS",
+          "IN",
+          "INF",
+          "INSERT",
+          "INTO",
+          "KEY",
+          "KEYS",
+          "KILL",
+          "LIMIT",
+          "SHOW",
+          "MEASUREMENT",
+          "MEASUREMENTS",
+          "NAME",
+          "OFFSET",
+          "ON",
+          "ORDER",
+          "PASSWORD",
+          "POLICY",
+          "POLICIES",
+          "PRIVILEGES",
+          "QUERIES",
+          "QUERY",
+          "READ",
+          "REPLICATION",
+          "RESAMPLE",
+          "RETENTION",
+          "REVOKE",
+          "SELECT",
+          "SERIES",
+          "SET",
+          "SHARD",
+          "SHARDS",
+          "SLIMIT",
+          "SOFFSET",
+          "STATS",
+          "SUBSCRIPTION",
+          "SUBSCRIPTIONS",
+          "TAG",
+          "TO",
+          "USER",
+          "USERS",
+          "VALUES",
+          "WHERE",
+          "WITH",
+          "WRITE"
+  );
+}