You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/02/22 21:12:04 UTC
[incubator-streampipes] branch dev updated: [hotfix] Log number format exceptions in data lake sink
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new f38819c [hotfix] Log number format exceptions in data lake sink
new 8663e0d Merge branch 'dev' of github.com:apache/incubator-streampipes into dev
f38819c is described below
commit f38819cccbca5e4fa51649ca2a10da0a399cde9b
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Feb 22 22:11:38 2022 +0100
[hotfix] Log number format exceptions in data lake sink
---
.../sinks/internal/jvm/datalake/DataLake.java | 3 +-
.../jvm/datalake/DataLakeInfluxDbClient.java | 45 ++++++++++++----------
2 files changed, 25 insertions(+), 23 deletions(-)
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
index 52d4c4b..83cbb3b 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLake.java
@@ -100,8 +100,7 @@ public class DataLake implements EventSink<DataLakeParameters> {
parameters.getTimestampField(),
parameters.getBatchSize(),
parameters.getFlushDuration(),
- this.eventSchema,
- LOG
+ this.eventSchema
);
}
diff --git a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
index 4f98039..80c7214 100644
--- a/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
+++ b/streampipes-extensions/streampipes-sinks-internal-jvm/src/main/java/org/apache/streampipes/sinks/internal/jvm/datalake/DataLakeInfluxDbClient.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.sinks.internal.jvm.datalake;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.field.PrimitiveField;
import org.apache.streampipes.model.schema.EventProperty;
@@ -32,6 +31,8 @@ import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
@@ -43,13 +44,13 @@ import java.util.concurrent.TimeUnit;
*/
public class DataLakeInfluxDbClient {
+ private static final Logger LOG = LoggerFactory.getLogger(DataLakeInfluxDbClient.class);
+
private final String measureName;
private final String timestampField;
private final Integer batchSize;
private final Integer flushDuration;
- private final Logger logger;
-
private InfluxDB influxDb = null;
private final InfluxDbConnectionSettings settings;
private final EventSchema originalEventSchema;
@@ -60,14 +61,12 @@ public class DataLakeInfluxDbClient {
String timestampField,
Integer batchSize,
Integer flushDuration,
- EventSchema originalEventSchema,
- Logger logger) throws SpRuntimeException {
+ EventSchema originalEventSchema) throws SpRuntimeException {
this.settings = settings;
this.originalEventSchema = originalEventSchema;
this.timestampField = timestampField;
this.batchSize = batchSize;
this.flushDuration = flushDuration;
- this.logger = logger;
this.measureName = settings.getMeasureName();
prepareSchema();
@@ -101,7 +100,7 @@ public class DataLakeInfluxDbClient {
String databaseName = settings.getDatabaseName();
// Checking whether the database exists
if(!databaseExists(databaseName)) {
- logger.info("Database '" + databaseName + "' not found. Gets created ...");
+ LOG.info("Database '" + databaseName + "' not found. Gets created ...");
createDatabase(databaseName);
}
@@ -165,20 +164,24 @@ public class DataLakeInfluxDbClient {
if ("DIMENSION_PROPERTY".equals(ep.getPropertyScope())) {
p.tag(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
} else {
- // Store property according to property type
- String runtimeType = ((EventPropertyPrimitive) ep).getRuntimeType();
- if (XSD._integer.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
- } else if (XSD._float.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
- } else if (XSD._double.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsDouble());
- } else if (XSD._boolean.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsBoolean());
- } else if (XSD._long.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
- } else {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
+ try {
+ // Store property according to property type
+ String runtimeType = ((EventPropertyPrimitive) ep).getRuntimeType();
+ if (XSD._integer.toString().equals(runtimeType)) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
+ } else if (XSD._float.toString().equals(runtimeType)) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
+ } else if (XSD._double.toString().equals(runtimeType)) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsDouble());
+ } else if (XSD._boolean.toString().equals(runtimeType)) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsBoolean());
+ } else if (XSD._long.toString().equals(runtimeType)) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
+ } else {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
+ }
+ } catch (NumberFormatException e) {
+ LOG.warn("Wrong number format for field {}, ignoring.", preparedRuntimeName);
}
}
}