You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/18 20:33:21 UTC
[incubator-streampipes] 15/15: [STREAMPIPES-577] Improve handling of unknown data types
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit fbe88ba3fdfabcf2f5e75b7add7e6463056b1031
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Thu Aug 18 10:25:19 2022 +0200
[STREAMPIPES-577] Improve handling of unknown data types
---
.../connect/adapter/format/csv/CsvParser.java | 4 ++--
.../adapter/format/util/JsonEventProperty.java | 6 +----
.../connect/adapter/util/DatatypeUtils.java | 28 +++++++++++++++-------
.../dataexplorer/commons/influx/InfluxStore.java | 12 ++++++++--
.../storage/couchdb/serializer/GsonSerializer.java | 1 +
5 files changed, 33 insertions(+), 18 deletions(-)
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
index 812c67b7f..0b3e6be6d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
@@ -108,11 +108,11 @@ public class CsvParser extends Parser {
EventSchema resultSchema = new EventSchema();
for (int i = 0; i < keys.length; i++) {
EventPropertyPrimitive p = new EventPropertyPrimitive();
- var runtimeType = DatatypeUtils.getXsdDatatype(data[i]);
+ var runtimeType = DatatypeUtils.getXsdDatatype(data[i], true);
var convertedValue = DatatypeUtils.convertValue(data[i], runtimeType);
p.setRuntimeName(keys[i]);
p.setRuntimeType(runtimeType);
- sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i]), convertedValue));
+ sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i], true), convertedValue));
resultSchema.addEventProperty(p);
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java
index b42711bc4..a20319075 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/util/JsonEventProperty.java
@@ -55,11 +55,7 @@ public class JsonEventProperty {
resultProperty = new EventPropertyPrimitive();
resultProperty.setRuntimeName(key);
((EventPropertyPrimitive) resultProperty).setRuntimeType(XSD._string.toString());
- } else if (o.getClass().equals(Long.class)) {
- resultProperty = new EventPropertyPrimitive();
- resultProperty.setRuntimeName(key);
- ((EventPropertyPrimitive) resultProperty).setRuntimeType(XSD._long.toString());
- } else if (o.getClass().equals(Integer.class) || o.getClass().equals(Double.class) || o.getClass().equals(Float.class)) {
+ } else if (o.getClass().equals(Integer.class) || o.getClass().equals(Double.class) || o.getClass().equals(Float.class) || o.getClass().equals(Long.class)) {
resultProperty = new EventPropertyPrimitive();
resultProperty.setRuntimeName(key);
((EventPropertyPrimitive) resultProperty).setRuntimeType(XSD._float.toString());
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
index 9261beee3..f87bab36e 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
@@ -42,10 +42,10 @@ public class DatatypeUtils {
return Boolean.parseBoolean(stringValue);
} else if (XSD._integer.toString().equals(targetDatatypeXsd)) {
var floatingNumber = Float.parseFloat(stringValue);
- return Math.round(floatingNumber);
+ return Integer.parseInt(String.valueOf(Math.round(floatingNumber)));
} else if (XSD._long.toString().equals(targetDatatypeXsd)) {
var floatingNumber = Double.parseDouble(stringValue);
- return Math.round(floatingNumber);
+ return Long.parseLong(String.valueOf(Math.round(floatingNumber)));
}
} catch (NumberFormatException e) {
LOG.error("Number format exception {}", value);
@@ -56,12 +56,14 @@ public class DatatypeUtils {
return value;
}
- public static String getCanonicalTypeClassName(String value) {
- return getTypeClass(value).getCanonicalName();
+ public static String getCanonicalTypeClassName(String value,
+ boolean preferFloat) {
+ return getTypeClass(value, preferFloat).getCanonicalName();
}
- public static String getXsdDatatype(String value) {
- var clazz = getTypeClass(value);
+ public static String getXsdDatatype(String value,
+ boolean preferFloat) {
+ var clazz = getTypeClass(value, preferFloat);
if (clazz.equals(Integer.class)) {
return XSD._integer.toString();
} else if (clazz.equals(Long.class)) {
@@ -77,17 +79,18 @@ public class DatatypeUtils {
}
}
- public static Class<?> getTypeClass(String value) {
+ public static Class<?> getTypeClass(String value,
+ boolean preferFloat) {
if (NumberUtils.isParsable(value)) {
try {
Integer.parseInt(value);
- return Integer.class;
+ return preferFloat ? Float.class : Integer.class;
} catch (NumberFormatException ignored) {
}
try {
Long.parseLong(value);
- return Long.class;
+ return preferFloat ? Float.class : Long.class;
} catch (NumberFormatException ignored) {
}
@@ -105,4 +108,11 @@ public class DatatypeUtils {
return String.class;
}
+
+ public static void main(String[] args) {
+ long max = Long.MAX_VALUE;
+ String className = getCanonicalTypeClassName(String.valueOf(max), true);
+ System.out.println(className);
+ System.out.println(convertValue(max, className));
+ }
}
diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
index cb429fee8..d0bfb7928 100644
--- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
+++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java
@@ -152,7 +152,11 @@ public class InfluxStore {
// Store property according to property type
String runtimeType = ((EventPropertyPrimitive) ep).getRuntimeType();
if (XSD._integer.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
+ try {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt());
+ } catch (NumberFormatException ef) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
+ }
} else if (XSD._float.toString().equals(runtimeType)) {
p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
} else if (XSD._double.toString().equals(runtimeType)) {
@@ -160,7 +164,11 @@ public class InfluxStore {
} else if (XSD._boolean.toString().equals(runtimeType)) {
p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsBoolean());
} else if (XSD._long.toString().equals(runtimeType)) {
- p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
+ try {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsLong());
+ } catch (NumberFormatException ef) {
+ p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsFloat());
+ }
} else {
p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsString());
}
diff --git a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java
index af208ac96..311c55fd0 100644
--- a/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java
+++ b/streampipes-storage-couchdb/src/main/java/org/apache/streampipes/storage/couchdb/serializer/GsonSerializer.java
@@ -108,6 +108,7 @@ public class GsonSerializer {
.registerSubtype(UnitTransformRuleDescription.class, "org.apache.streampipes.model.UnitTransformRuleDescription")
.registerSubtype(TimestampTranfsformationRuleDescription.class, "org.apache.streampipes.model.TimestampTranfsformationRuleDescription")
.registerSubtype(EventRateTransformationRuleDescription.class, "org.apache.streampipes.model.EventRateTransformationRuleDescription")
+ .registerSubtype(ChangeDatatypeTransformationRuleDescription.class, "org.apache.streampipes.model.ChangeDatatypeTransformationRuleDescription")
.registerSubtype(CorrectionValueTransformationRuleDescription.class, "org.apache.streampipes.model.CorrectionValueTransformationRuleDescription"));
builder.registerTypeAdapterFactory(RuntimeTypeAdapterFactory.of(AdapterDescription.class, "type")