You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/22 21:12:04 UTC

[incubator-streampipes] branch dev updated: [hotfix] Log number format exceptions in data lake sink

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new f38819c  [hotfix] Log number format exceptions in data lake sink
     new 8663e0d  Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
f38819c is described below

commit f38819cccbca5e4fa51649ca2a10da0a399cde9b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Feb 22 22:11:38 2022 +0100

    [hotfix] Log number format exceptions in data lake sink
---
 .../sinks/internal/jvm/datalake/DataLake.java      |  3 +-
 .../jvm/datalake/DataLakeInfluxDbClient.java       | 45 ++++++++++++----------
 2 files changed, 25 insertions(+), 23 deletions(-)

diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index 52d4c4b..83cbb3b 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -100,8 +100,7 @@ public class DataLake implements EventSink<DataLakeParameters> {
             parameters.getTimestampField(),
             parameters.getBatchSize(),
             parameters.getFlushDuration(),
-            this.eventSchema,
-            LOG
+            this.eventSchema
     );
   }
 
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index 4f98039..80c7214 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.sinks.internal.jvm.datalake;
 
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.model.runtime.field.PrimitiveField;
 import org.apache.streampipes.model.schema.EventProperty;
@@ -32,6 +31,8 @@ import org.influxdb.dto.Point;
 import org.influxdb.dto.Pong;
 import org.influxdb.dto.Query;
 import org.influxdb.dto.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -43,13 +44,13 @@ import java.util.concurrent.TimeUnit;
  */
 public class DataLakeInfluxDbClient {
 
+    private static final Logger LOG = LoggerFactory.getLogger(DataLakeInfluxDbClient.class);
+
     private final String measureName;
     private final String timestampField;
     private final Integer batchSize;
     private final Integer flushDuration;
 
-    private final Logger logger;
-
     private InfluxDB influxDb = null;
     private final InfluxDbConnectionSettings settings;
     private final EventSchema originalEventSchema;
@@ -60,14 +61,12 @@ public class DataLakeInfluxDbClient {
                            String timestampField,
                            Integer batchSize,
                            Integer flushDuration,
-                           EventSchema originalEventSchema,
-                           Logger logger) throws SpRuntimeException {
+                           EventSchema originalEventSchema) throws SpRuntimeException {
         this.settings = settings;
         this.originalEventSchema = originalEventSchema;
         this.timestampField = timestampField;
         this.batchSize = batchSize;
         this.flushDuration = flushDuration;
-        this.logger = logger;
         this.measureName = settings.getMeasureName();
 
         prepareSchema();
@@ -101,7 +100,7 @@ public class DataLakeInfluxDbClient {
         String databaseName = settings.getDatabaseName();
         // Checking whether the database exists
         if(!databaseExists(databaseName)) {
-            logger.info("Database '" + databaseName + "' not found. Gets created ...");
+            LOG.info("Database '" + databaseName + "' not found. Gets created ...");
             createDatabase(databaseName);
         }
 
@@ -165,20 +164,24 @@ public class DataLakeInfluxDbClient {
                     if ("DIMENSION_PROPERTY".equals(ep.getPropertyScope())) {
                         p.tag(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
                     } else {
-                        // Store property according to property type
-                        String runtimeType = ((EventPropertyPrimitive) ep).getRuntimeType();
-                        if (XSD._integer.toString().equals(runtimeType)) {
-                            p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
-                        } else if (XSD._float.toString().equals(runtimeType)) {
-                            p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
-                        } else if (XSD._double.toString().equals(runtimeType)) {
-                            p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsDouble());
-                        } else if (XSD._boolean.toString().equals(runtimeType)) {
-                            p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsBoolean());
-                        } else if (XSD._long.toString().equals(runtimeType)) {
-                            p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
-                        } else {
-                            p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
+                        try {
+                            // Store property according to property type
+                            String runtimeType = ((EventPropertyPrimitive) ep).getRuntimeType();
+                            if (XSD._integer.toString().equals(runtimeType)) {
+                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
+                            } else if (XSD._float.toString().equals(runtimeType)) {
+                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
+                            } else if (XSD._double.toString().equals(runtimeType)) {
+                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsDouble());
+                            } else if (XSD._boolean.toString().equals(runtimeType)) {
+                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsBoolean());
+                            } else if (XSD._long.toString().equals(runtimeType)) {
+                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
+                            } else {
+                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
+                            }
+                        } catch (NumberFormatException e) {
+                            LOG.warn("Wrong number format for field {}, ignoring.", preparedRuntimeName);
                         }
                     }
                 }