You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/18 08:25:27 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-577] Improve handling of unknown data types

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 723870c3e [STREAMPIPES-577] Improve handling of unknown data types
723870c3e is described below

commit 723870c3e29fd3e5ef36f26013ab554636514635
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 18 10:25:19 2022 +0200

    [STREAMPIPES-577] Improve handling of unknown data types
---
 .../connect/adapter/format/csv/CsvParser.java      |  4 ++--
 .../adapter/format/util/JsonEventProperty.java     |  6 +----
 .../connect/adapter/util/DatatypeUtils.java        | 28 +++++++++++++++-------
 .../dataexplorer/commons/influx/InfluxStore.java   | 12 ++++++++--
 .../storage/couchdb/serializer/GsonSerializer.java |  1 +
 5 files changed, 33 insertions(+), 18 deletions(-)

diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
index 812c67b7f..0b3e6be6d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
@@ -108,11 +108,11 @@ public class CsvParser extends Parser {
         EventSchema resultSchema = new EventSchema();
         for (int i = 0; i < keys.length; i++) {
             EventPropertyPrimitive p = new EventPropertyPrimitive();
-            var runtimeType = DatatypeUtils.getXsdDatatype(data[i]);
+            var runtimeType = DatatypeUtils.getXsdDatatype(data[i], true);
             var convertedValue = DatatypeUtils.convertValue(data[i], runtimeType);
             p.setRuntimeName(keys[i]);
             p.setRuntimeType(runtimeType);
-            sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i]), convertedValue));
+            sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i], true), convertedValue));
             resultSchema.addEventProperty(p);
         }
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java
index b42711bc4..a20319075 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java
@@ -55,11 +55,7 @@ public class JsonEventProperty {
             resultProperty = new EventPropertyPrimitive();
             resultProperty.setRuntimeName(key);
             ((EventPropertyPrimitive) resultProperty).setRuntimeType(XSD._string.toString());
-        } else if (o.getClass().equals(Long.class)) {
-            resultProperty = new EventPropertyPrimitive();
-            resultProperty.setRuntimeName(key);
-            ((EventPropertyPrimitive) resultProperty).setRuntimeType(XSD._long.toString());
-        } else if (o.getClass().equals(Integer.class) || o.getClass().equals(Double.class) || o.getClass().equals(Float.class)) {
+        } else if (o.getClass().equals(Integer.class) || o.getClass().equals(Double.class) || o.getClass().equals(Float.class) || o.getClass().equals(Long.class)) {
             resultProperty = new EventPropertyPrimitive();
             resultProperty.setRuntimeName(key);
             ((EventPropertyPrimitive) resultProperty).setRuntimeType(XSD._float.toString());
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
index 9261beee3..f87bab36e 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
@@ -42,10 +42,10 @@ public class DatatypeUtils {
           return Boolean.parseBoolean(stringValue);
         } else if (XSD._integer.toString().equals(targetDatatypeXsd)) {
           var floatingNumber = Float.parseFloat(stringValue);
-          return Math.round(floatingNumber);
+          return Integer.parseInt(String.valueOf(Math.round(floatingNumber)));
         } else if (XSD._long.toString().equals(targetDatatypeXsd)) {
           var floatingNumber = Double.parseDouble(stringValue);
-          return Math.round(floatingNumber);
+          return Long.parseLong(String.valueOf(Math.round(floatingNumber)));
         }
       } catch (NumberFormatException e) {
         LOG.error("Number format exception {}", value);
@@ -56,12 +56,14 @@ public class DatatypeUtils {
     return value;
   }
 
-  public static String getCanonicalTypeClassName(String value) {
-    return getTypeClass(value).getCanonicalName();
+  public static String getCanonicalTypeClassName(String value,
+                                                 boolean preferFloat) {
+    return getTypeClass(value, preferFloat).getCanonicalName();
   }
 
-  public static String getXsdDatatype(String value) {
-    var clazz = getTypeClass(value);
+  public static String getXsdDatatype(String value,
+                                      boolean preferFloat) {
+    var clazz = getTypeClass(value, preferFloat);
     if (clazz.equals(Integer.class)) {
       return XSD._integer.toString();
     } else if (clazz.equals(Long.class)) {
@@ -77,17 +79,18 @@ public class DatatypeUtils {
     }
   }
 
-  public static Class<?> getTypeClass(String value) {
+  public static Class<?> getTypeClass(String value,
+                                      boolean preferFloat) {
     if (NumberUtils.isParsable(value)) {
       try {
         Integer.parseInt(value);
-        return Integer.class;
+        return preferFloat ? Float.class : Integer.class;
       } catch (NumberFormatException ignored) {
       }
 
       try {
         Long.parseLong(value);
-        return Long.class;
+        return preferFloat ? Float.class : Long.class;
       } catch (NumberFormatException ignored) {
       }
 
@@ -105,4 +108,11 @@ public class DatatypeUtils {
 
     return String.class;
   }
+
+  public static void main(String[] args) {
+    long max = Long.MAX_VALUE;
+    String className = getCanonicalTypeClassName(String.valueOf(max), true);
+    System.out.println(className);
+    System.out.println(convertValue(max, className));
+  }
 }
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index cb429fee8..d0bfb7928 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -152,7 +152,11 @@ public class InfluxStore {
                             // Store property according to property type
                             String runtimeType = ((EventPropertyPrimitive) ep).getRuntimeType();
                             if (XSD._integer.toString().equals(runtimeType)) {
-                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
+                                try {
+                                    p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
+                                } catch (NumberFormatException ef) {
+                                    p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
+                                }
                             } else if (XSD._float.toString().equals(runtimeType)) {
                                 p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
                             } else if (XSD._double.toString().equals(runtimeType)) {
@@ -160,7 +164,11 @@ public class InfluxStore {
                             } else if (XSD._boolean.toString().equals(runtimeType)) {
                                 p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsBoolean());
                             } else if (XSD._long.toString().equals(runtimeType)) {
-                                p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
+                                try {
+                                    p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
+                                } catch (NumberFormatException ef) {
+                                    p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
+                                }
                             } else {
                                 p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
                             }
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java
index af208ac96..311c55fd0 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java
@@ -108,6 +108,7 @@ public class GsonSerializer {
             .registerSubtype(UnitTransformRuleDescription.class, "org.apache.streampipes.model.UnitTransformRuleDescription")
             .registerSubtype(TimestampTranfsformationRuleDescription.class, "org.apache.streampipes.model.TimestampTranfsformationRuleDescription")
             .registerSubtype(EventRateTransformationRuleDescription.class, "org.apache.streampipes.model.EventRateTransformationRuleDescription")
+            .registerSubtype(ChangeDatatypeTransformationRuleDescription.class, "org.apache.streampipes.model.ChangeDatatypeTransformationRuleDescription")
             .registerSubtype(CorrectionValueTransformationRuleDescription.class, "org.apache.streampipes.model.CorrectionValueTransformationRuleDescription"));
 
     builder.registerTypeAdapterFactory(RuntimeTypeAdapterFactory.of(AdapterDescription.class, "type")