You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/17 15:03:30 UTC

[incubator-streampipes] branch STREAMPIPES-577 updated: [STREAMPIPES-577] Improve parsing and preview of datatypes

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-577
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-577 by this push:
     new 91f08672b [STREAMPIPES-577] Improve parsing and preview of datatypes
91f08672b is described below

commit 91f08672bf39b655f981bede8e265910fc07906c
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 17:03:18 2022 +0200

    [STREAMPIPES-577] Improve parsing and preview of datatypes
---
 .../connect/adapter/AdapterPipelineGenerator.java  |  22 ++---
 .../connect/adapter/format/csv/CsvParser.java      |  37 +------
 .../TransformValueAdapterPipelineElement.java      |  14 ++-
 .../value/DatatypeTransformationRule.java          |  52 ++++++++++
 .../transform/value/ValueEventTransformer.java     |  46 +++------
 .../connect/adapter/util/DatatypeUtils.java        | 108 +++++++++++++++++++++
 .../connect/adapter/util/PollingSettings.java      |   1 +
 .../rules/TransformationRuleDescription.java       |   1 +
 ...hangeDatatypeTransformationRuleDescription.java |  60 ++++++++++++
 .../src/lib/model/gen/streampipes-model.ts         |  39 ++++++--
 .../services/transformation-rule.service.ts        |  70 ++++++++-----
 11 files changed, 328 insertions(+), 122 deletions(-)

diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java
index 31ce0e9a1..dfe84a105 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java
@@ -19,7 +19,6 @@
 package org.apache.streampipes.connect.adapter;
 
 import org.apache.streampipes.config.backend.BackendConfig;
-import org.apache.streampipes.config.backend.SpProtocol;
 import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
 import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.DuplicateFilterPipelineElement;
@@ -29,10 +28,7 @@ import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.schema.SchemaTransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.*;
 import org.apache.streampipes.model.grounding.JmsTransportProtocol;
 import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
 
@@ -47,13 +43,13 @@ public class AdapterPipelineGenerator {
 
     var pipelineElements = makeAdapterPipelineElements(adapterDescription.getRules());
 
-    RemoveDuplicatesTransformationRuleDescription duplicatesTransformationRuleDescription = getRemoveDuplicateRule(adapterDescription.getRules());
+    var duplicatesTransformationRuleDescription = getRemoveDuplicateRule(adapterDescription.getRules());
     if (duplicatesTransformationRuleDescription != null) {
       pipelineElements.add(new DuplicateFilterPipelineElement(duplicatesTransformationRuleDescription.getFilterTimeWindow()));
     }
 
-    TransformStreamAdapterElement transformStreamAdapterElement = new TransformStreamAdapterElement();
-    EventRateTransformationRuleDescription eventRateTransformationRuleDescription = getEventRateTransformationRule(adapterDescription.getRules());
+    var transformStreamAdapterElement = new TransformStreamAdapterElement();
+    var eventRateTransformationRuleDescription = getEventRateTransformationRule(adapterDescription.getRules());
     if (eventRateTransformationRuleDescription != null) {
       transformStreamAdapterElement.addStreamTransformationRuleDescription(eventRateTransformationRuleDescription);
     }
@@ -73,13 +69,13 @@ public class AdapterPipelineGenerator {
     List<IAdapterPipelineElement> pipelineElements = new ArrayList<>();
 
     // Must be before the schema transformations to ensure that user can move this event property
-    AddTimestampRuleDescription timestampTransformationRuleDescription = getTimestampRule(rules);
+    var timestampTransformationRuleDescription = getTimestampRule(rules);
     if (timestampTransformationRuleDescription != null) {
       pipelineElements.add(new AddTimestampPipelineElement(
         timestampTransformationRuleDescription.getRuntimeKey()));
     }
 
-    AddValueTransformationRuleDescription valueTransformationRuleDescription = getAddValueRule(rules);
+    var valueTransformationRuleDescription = getAddValueRule(rules);
     if (valueTransformationRuleDescription != null) {
       pipelineElements.add(new AddValuePipelineElement(
         valueTransformationRuleDescription.getRuntimeKey(),
@@ -95,7 +91,7 @@ public class AdapterPipelineGenerator {
   }
 
   private SendToBrokerAdapterSink<?> getAdapterSink(AdapterDescription adapterDescription) {
-    SpProtocol prioritizedProtocol =
+    var prioritizedProtocol =
       BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
 
     if (GroundingService.isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
@@ -125,10 +121,6 @@ public class AdapterPipelineGenerator {
     return getRule(rules, AddValueTransformationRuleDescription.class);
   }
 
-  private CorrectionValueTransformationRuleDescription getCorrectionValueRule(List<TransformationRuleDescription> rules) {
-    return getRule(rules, CorrectionValueTransformationRuleDescription.class);
-  }
-
   private <G extends TransformationRuleDescription> G getRule(List<TransformationRuleDescription> rules,
                                                               Class<G> type) {
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
index a5e4f3841..812c67b7f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect.adapter.format.csv;
 
 import org.apache.streampipes.connect.adapter.model.generic.Parser;
 import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
+import org.apache.streampipes.connect.adapter.util.DatatypeUtils;
 import org.apache.streampipes.connect.api.EmitBinaryEvent;
 import org.apache.streampipes.connect.api.exception.ParseException;
 import org.apache.streampipes.model.connect.grounding.FormatDescription;
@@ -28,7 +29,6 @@ import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
 import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
 import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.vocabulary.XSD;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -108,9 +108,11 @@ public class CsvParser extends Parser {
         EventSchema resultSchema = new EventSchema();
         for (int i = 0; i < keys.length; i++) {
             EventPropertyPrimitive p = new EventPropertyPrimitive();
+            var runtimeType = DatatypeUtils.getXsdDatatype(data[i]);
+            var convertedValue = DatatypeUtils.convertValue(data[i], runtimeType);
             p.setRuntimeName(keys[i]);
-            p.setRuntimeType(getTypeString(data[i]));
-            sample.put(keys[i], new GuessTypeInfo(getTypeString(data[i]), data[i]));
+            p.setRuntimeType(runtimeType);
+            sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i]), convertedValue));
             resultSchema.addEventProperty(p);
         }
 
@@ -122,35 +124,6 @@ public class CsvParser extends Parser {
         return getSchemaAndSample(oneEvent).getEventSchema();
     }
 
-    private String getTypeString(String o) {
-
-        String typeClass = getTypeClass(o);
-
-        if (Float.class.getCanonicalName().equals(typeClass)) {
-            return XSD._float.toString();
-        } else if (Boolean.class.getCanonicalName().equals(typeClass)) {
-            return XSD._boolean.toString();
-        } else {
-            return XSD._string.toString();
-        }
-    }
-
-    private String getTypeClass(String o) {
-
-        try {
-            Double.parseDouble(o);
-            return Float.class.getCanonicalName();
-        } catch (NumberFormatException e) {
-
-        }
-
-        if (o.equalsIgnoreCase("true") || o.equalsIgnoreCase("false")) {
-            return Boolean.class.getCanonicalName();
-        }
-
-        return String.class.getCanonicalName();
-    }
-
 
     public static String[] parseLine(String cvsLine, String separatorString) {
 
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
index 25d357518..abb1bd27a 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.connect.adapter.preprocessing.Util;
 import org.apache.streampipes.connect.adapter.preprocessing.transform.value.*;
 import org.apache.streampipes.connect.api.IAdapterPipelineElement;
 import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
 import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
@@ -43,11 +44,11 @@ public class TransformValueAdapterPipelineElement implements IAdapterPipelineEle
         // transforms description to actual rules
         for (TransformationRuleDescription ruleDescription : transformationRuleDescriptions) {
             if (ruleDescription instanceof UnitTransformRuleDescription) {
-                UnitTransformRuleDescription tmp = (UnitTransformRuleDescription) ruleDescription;
+                var tmp = (UnitTransformRuleDescription) ruleDescription;
                 rules.add(new UnitTransformationRule(Util.toKeyArray(tmp.getRuntimeKey()),
                         tmp.getFromUnitRessourceURL(), tmp.getToUnitRessourceURL()));
-            } else if(ruleDescription instanceof TimestampTranfsformationRuleDescription) {
-                TimestampTranfsformationRuleDescription tmp = (TimestampTranfsformationRuleDescription) ruleDescription;
+            } else if (ruleDescription instanceof TimestampTranfsformationRuleDescription) {
+                var tmp = (TimestampTranfsformationRuleDescription) ruleDescription;
                 TimestampTranformationRuleMode mode = null;
                 switch (tmp.getMode()) {
                     case "formatString": mode = TimestampTranformationRuleMode.FORMAT_STRING;
@@ -57,9 +58,12 @@ public class TransformValueAdapterPipelineElement implements IAdapterPipelineEle
                 rules.add(new TimestampTranformationRule(Util.toKeyArray(tmp.getRuntimeKey()), mode,
                         tmp.getFormatString(), tmp.getMultiplier()));
             }
-            else if(ruleDescription instanceof CorrectionValueTransformationRuleDescription) {
-                CorrectionValueTransformationRuleDescription tmp = (CorrectionValueTransformationRuleDescription) ruleDescription;
+            else if (ruleDescription instanceof CorrectionValueTransformationRuleDescription) {
+                var tmp = (CorrectionValueTransformationRuleDescription) ruleDescription;
                 rules.add(new CorrectionValueTransformationRule(Util.toKeyArray(tmp.getRuntimeKey()), tmp.getCorrectionValue(), tmp.getOperator()));
+            } else if (ruleDescription instanceof ChangeDatatypeTransformationRuleDescription) {
+                var tmp = (ChangeDatatypeTransformationRuleDescription) ruleDescription;
+                rules.add(new DatatypeTransformationRule(tmp.getRuntimeKey(), tmp.getOriginalDatatypeXsd(), tmp.getTargetDatatypeXsd()));
             }
 
             else {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/DatatypeTransformationRule.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/DatatypeTransformationRule.java
new file mode 100644
index 000000000..d8c6d8aac
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/DatatypeTransformationRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapter.preprocessing.transform.value;
+
+import org.apache.streampipes.connect.adapter.util.DatatypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class DatatypeTransformationRule implements ValueTransformationRule {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DatatypeTransformationRule.class);
+
+  private String eventKey;
+  private String originalDatatypeXsd;
+  private String targetDatatypeXsd;
+
+  public DatatypeTransformationRule(String eventKey, String originalDatatypeXsd, String targetDatatypeXsd) {
+    this.eventKey = eventKey;
+    this.originalDatatypeXsd = originalDatatypeXsd;
+    this.targetDatatypeXsd = targetDatatypeXsd;
+  }
+
+  @Override
+  public Map<String, Object> transform(Map<String, Object> event) {
+    Object value = event.get(eventKey);
+    Object transformedValue = transformDatatype(value);
+    event.put(eventKey, transformedValue);
+    return event;
+  }
+
+  public Object transformDatatype(Object value) {
+    return DatatypeUtils.convertValue(value, targetDatatypeXsd);
+  }
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
index 939cbad78..c965c894d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
@@ -26,14 +26,16 @@ import java.util.Map;
 
 public class ValueEventTransformer implements ValueTransformationRule {
 
-    private List<UnitTransformationRule> unitTransformationRules;
-    private List<TimestampTranformationRule> timestampTransformationRules;
-    private List<CorrectionValueTransformationRule> correctionValueTransformationRules;
+    private final List<UnitTransformationRule> unitTransformationRules;
+    private final List<TimestampTranformationRule> timestampTransformationRules;
+    private final List<CorrectionValueTransformationRule> correctionValueTransformationRules;
+    private final List<DatatypeTransformationRule> datatypeTransformationRules;
 
     public ValueEventTransformer(List<ValueTransformationRule> rules) {
         this.unitTransformationRules = new ArrayList<>();
         this.timestampTransformationRules = new ArrayList<>();
         this.correctionValueTransformationRules = new ArrayList<>();
+        this.datatypeTransformationRules = new ArrayList<>();
 
         for (TransformationRule rule : rules) {
             if (rule instanceof UnitTransformationRule) {
@@ -42,16 +44,12 @@ public class ValueEventTransformer implements ValueTransformationRule {
                 this.timestampTransformationRules.add((TimestampTranformationRule) rule);
             } else if (rule instanceof CorrectionValueTransformationRule) {
                 this.correctionValueTransformationRules.add((CorrectionValueTransformationRule) rule);
+            } else if (rule instanceof DatatypeTransformationRule) {
+                this.datatypeTransformationRules.add((DatatypeTransformationRule) rule);
             }
         }
     }
 
-/*
-    public ValueEventTransformer(List<UnitTransformationRule> unitTransformationRule) {
-        this.unitTransformationRules = new ArrayList<>();
-    }
-*/
-
     @Override
     public Map<String, Object> transform(Map<String, Object> event) {
 
@@ -63,36 +61,14 @@ public class ValueEventTransformer implements ValueTransformationRule {
             event = rule.transform(event);
         }
 
-        for (CorrectionValueTransformationRule rule : correctionValueTransformationRules) {
+        for (var rule: datatypeTransformationRules) {
             event = rule.transform(event);
         }
 
+        for (CorrectionValueTransformationRule rule : correctionValueTransformationRules) {
+            event = rule.transform(event);
+        }
 
         return event;
     }
-
-
-    public List<UnitTransformationRule> getUnitTransformationRules() {
-        return unitTransformationRules;
-    }
-
-    public void setUnitTransformationRules(List<UnitTransformationRule> unitTransformationRules) {
-        this.unitTransformationRules = unitTransformationRules;
-    }
-
-    public List<TimestampTranformationRule> getTimestampTransformationRules() {
-        return timestampTransformationRules;
-    }
-
-    public void setTimestampTransformationRules(List<TimestampTranformationRule> timestampTransformationRules) {
-        this.timestampTransformationRules = timestampTransformationRules;
-    }
-
-    public List<CorrectionValueTransformationRule> getCorrectionValueTransformationRules() {
-        return correctionValueTransformationRules;
-    }
-
-    public void setCorrectionValueTransformationRules(List<CorrectionValueTransformationRule> correctionValueTransformationRules) {
-        this.correctionValueTransformationRules = correctionValueTransformationRules;
-    }
 }
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
new file mode 100644
index 000000000..9261beee3
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapter.util;
+
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.streampipes.vocabulary.XSD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatatypeUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DatatypeUtils.class);
+
+  public static Object convertValue(Object value,
+                                    String targetDatatypeXsd) {
+    var stringValue = String.valueOf(value);
+    if (XSD._string.toString().equals(targetDatatypeXsd)) {
+      return stringValue;
+    } else {
+      try {
+        if (XSD._double.toString().equals(targetDatatypeXsd)) {
+          return Double.parseDouble(stringValue);
+        } else if (XSD._float.toString().equals(targetDatatypeXsd)) {
+          return Float.parseFloat(stringValue);
+        } else if (XSD._boolean.toString().equals(targetDatatypeXsd)) {
+          return Boolean.parseBoolean(stringValue);
+        } else if (XSD._integer.toString().equals(targetDatatypeXsd)) {
+          var floatingNumber = Float.parseFloat(stringValue);
+          return Math.round(floatingNumber);
+        } else if (XSD._long.toString().equals(targetDatatypeXsd)) {
+          var floatingNumber = Double.parseDouble(stringValue);
+          return Math.round(floatingNumber);
+        }
+      } catch (NumberFormatException e) {
+        LOG.error("Number format exception {}", value);
+        return value;
+      }
+    }
+
+    return value;
+  }
+
+  public static String getCanonicalTypeClassName(String value) {
+    return getTypeClass(value).getCanonicalName();
+  }
+
+  public static String getXsdDatatype(String value) {
+    var clazz = getTypeClass(value);
+    if (clazz.equals(Integer.class)) {
+      return XSD._integer.toString();
+    } else if (clazz.equals(Long.class)) {
+      return XSD._long.toString();
+    } else if (clazz.equals(Float.class)) {
+      return XSD._float.toString();
+    } else if (clazz.equals(Double.class)) {
+      return XSD._double.toString();
+    } else if (clazz.equals(Boolean.class)) {
+      return XSD._boolean.toString();
+    } else {
+      return XSD._string.toString();
+    }
+  }
+
+  public static Class<?> getTypeClass(String value) {
+    if (NumberUtils.isParsable(value)) {
+      try {
+        Integer.parseInt(value);
+        return Integer.class;
+      } catch (NumberFormatException ignored) {
+      }
+
+      try {
+        Long.parseLong(value);
+        return Long.class;
+      } catch (NumberFormatException ignored) {
+      }
+
+      try {
+        Double.parseDouble(value);
+        return Float.class;
+      } catch (NumberFormatException ignored) {
+      }
+
+    }
+
+    if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
+      return Boolean.class;
+    }
+
+    return String.class;
+  }
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java
index 79dea5b0e..177702c82 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java
@@ -15,6 +15,7 @@
  * limitations under the License.
  *
  */
+
 package org.apache.streampipes.connect.adapter.util;
 
 import java.util.concurrent.TimeUnit;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java
index bd1b85b5e..590b4161e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java
@@ -41,6 +41,7 @@ import org.apache.streampipes.model.shared.annotation.TsModel;
         @JsonSubTypes.Type(DeleteRuleDescription.class),
         @JsonSubTypes.Type(RenameRuleDescription.class),
         @JsonSubTypes.Type(MoveRuleDescription.class),
+        @JsonSubTypes.Type(ChangeDatatypeTransformationRuleDescription.class),
         @JsonSubTypes.Type(CorrectionValueTransformationRuleDescription.class),
 })
 public abstract class TransformationRuleDescription extends UnnamedStreamPipesEntity {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/ChangeDatatypeTransformationRuleDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/ChangeDatatypeTransformationRuleDescription.java
new file mode 100644
index 000000000..aa64e647b
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/ChangeDatatypeTransformationRuleDescription.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.connect.rules.value;
+
+public class ChangeDatatypeTransformationRuleDescription extends ValueTransformationRuleDescription {
+
+  private String runtimeKey;
+  private String originalDatatypeXsd;
+  private String targetDatatypeXsd;
+
+  public ChangeDatatypeTransformationRuleDescription() {
+  }
+
+  public ChangeDatatypeTransformationRuleDescription(ChangeDatatypeTransformationRuleDescription other) {
+    super(other);
+    this.runtimeKey = other.getRuntimeKey();
+    this.originalDatatypeXsd = other.getOriginalDatatypeXsd();
+    this.targetDatatypeXsd = other.getTargetDatatypeXsd();
+  }
+
+  public String getRuntimeKey() {
+    return runtimeKey;
+  }
+
+  public void setRuntimeKey(String runtimeKey) {
+    this.runtimeKey = runtimeKey;
+  }
+
+  public String getOriginalDatatypeXsd() {
+    return originalDatatypeXsd;
+  }
+
+  public void setOriginalDatatypeXsd(String originalDatatypeXsd) {
+    this.originalDatatypeXsd = originalDatatypeXsd;
+  }
+
+  public String getTargetDatatypeXsd() {
+    return targetDatatypeXsd;
+  }
+
+  public void setTargetDatatypeXsd(String targetDatatypeXsd) {
+    this.targetDatatypeXsd = targetDatatypeXsd;
+  }
+}
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index d2712a990..88c0b73ca 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -18,10 +18,10 @@
 /* tslint:disable */
 /* eslint-disable */
 // @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2022-08-16 16:51:03.
+// Generated using typescript-generator version 2.27.744 on 2022-08-17 14:48:34.
 
 export class AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
+    "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
     elementId: string;
 
     static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
@@ -36,7 +36,7 @@ export class AbstractStreamPipesEntity {
 }
 
 export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
+    "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
 
     static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
         if (!data) {
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
         instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
         instance.internallyManaged = data.internallyManaged;
         instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
-        instance.dom = data.dom;
         instance.uri = data.uri;
+        instance.dom = data.dom;
         instance._rev = data._rev;
         return instance;
     }
@@ -192,9 +192,9 @@ export class AdapterDescription extends NamedStreamPipesEntity {
         instance.selectedEndpointUrl = data.selectedEndpointUrl;
         instance.correspondingServiceGroup = data.correspondingServiceGroup;
         instance.correspondingDataStreamElementId = data.correspondingDataStreamElementId;
-        instance.schemaRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.schemaRules);
         instance.streamRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.streamRules);
         instance.valueRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.valueRules);
+        instance.schemaRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.schemaRules);
         return instance;
     }
 
@@ -304,7 +304,7 @@ export class AdapterType {
 }
 
 export class TransformationRuleDescription extends UnnamedStreamPipesEntity {
-    "@class": "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.valu [...]
+    "@class": "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.valu [...]
 
     static fromData(data: TransformationRuleDescription, target?: TransformationRuleDescription): TransformationRuleDescription {
         if (!data) {
@@ -340,6 +340,8 @@ export class TransformationRuleDescription extends UnnamedStreamPipesEntity {
                 return RenameRuleDescription.fromData(data);
             case "org.apache.streampipes.model.connect.rules.schema.MoveRuleDescription":
                 return MoveRuleDescription.fromData(data);
+            case "org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription":
+                return ChangeDatatypeTransformationRuleDescription.fromData(data);
             case "org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription":
                 return CorrectionValueTransformationRuleDescription.fromData(data);
         }
@@ -347,7 +349,7 @@ export class TransformationRuleDescription extends UnnamedStreamPipesEntity {
 }
 
 export class ValueTransformationRuleDescription extends TransformationRuleDescription {
-    "@class": "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription" | "org.apache.streampipes.model.connect.rules [...]
+    "@class": "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription" | "org.apache.streampipes.model.connect.rules [...]
 
     static fromData(data: ValueTransformationRuleDescription, target?: ValueTransformationRuleDescription): ValueTransformationRuleDescription {
         if (!data) {
@@ -689,6 +691,25 @@ export class Category {
     }
 }
 
+export class ChangeDatatypeTransformationRuleDescription extends ValueTransformationRuleDescription {
+    "@class": "org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription";
+    originalDatatypeXsd: string;
+    runtimeKey: string;
+    targetDatatypeXsd: string;
+
+    static fromData(data: ChangeDatatypeTransformationRuleDescription, target?: ChangeDatatypeTransformationRuleDescription): ChangeDatatypeTransformationRuleDescription {
+        if (!data) {
+            return data;
+        }
+        const instance = target || new ChangeDatatypeTransformationRuleDescription();
+        super.fromData(data, instance);
+        instance.runtimeKey = data.runtimeKey;
+        instance.originalDatatypeXsd = data.originalDatatypeXsd;
+        instance.targetDatatypeXsd = data.targetDatatypeXsd;
+        return instance;
+    }
+}
+
 export class CodeInputStaticProperty extends StaticProperty {
     "@class": "org.apache.streampipes.model.staticproperty.CodeInputStaticProperty";
     codeTemplate: string;
@@ -2618,9 +2639,9 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity {
         const instance = target || new PipelineTemplateDescription();
         super.fromData(data, instance);
         instance.boundTo = __getCopyArrayFn(BoundPipelineElement.fromData)(data.boundTo);
-        instance.pipelineTemplateName = data.pipelineTemplateName;
         instance.pipelineTemplateId = data.pipelineTemplateId;
         instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
+        instance.pipelineTemplateName = data.pipelineTemplateName;
         return instance;
     }
 }
@@ -3461,7 +3482,7 @@ export type StreamTransformationRuleDescriptionUnion = EventRateTransformationRu
 
 export type TopicDefinitionUnion = SimpleTopicDefinition | WildcardTopicDefinition;
 
-export type TransformationRuleDescriptionUnion = AddTimestampRuleDescription | AddValueTransformationRuleDescription | TimestampTranfsformationRuleDescription | UnitTransformRuleDescription | EventRateTransformationRuleDescription | RemoveDuplicatesTransformationRuleDescription | CreateNestedRuleDescription | DeleteRuleDescription | RenameRuleDescription | MoveRuleDescription | CorrectionValueTransformationRuleDescription;
+export type TransformationRuleDescriptionUnion = AddTimestampRuleDescription | AddValueTransformationRuleDescription | TimestampTranfsformationRuleDescription | UnitTransformRuleDescription | EventRateTransformationRuleDescription | RemoveDuplicatesTransformationRuleDescription | CreateNestedRuleDescription | DeleteRuleDescription | RenameRuleDescription | MoveRuleDescription | ChangeDatatypeTransformationRuleDescription | CorrectionValueTransformationRuleDescription;
 
 export type TransportProtocolUnion = JmsTransportProtocol | KafkaTransportProtocol | MqttTransportProtocol;
 
diff --git a/ui/src/app/connect/services/transformation-rule.service.ts b/ui/src/app/connect/services/transformation-rule.service.ts
index 87819561a..478669698 100644
--- a/ui/src/app/connect/services/transformation-rule.service.ts
+++ b/ui/src/app/connect/services/transformation-rule.service.ts
@@ -30,6 +30,7 @@ import {
   EventSchema,
   MoveRuleDescription,
   RenameRuleDescription,
+  ChangeDatatypeTransformationRuleDescription,
   TimestampTranfsformationRuleDescription,
   TransformationRuleDescriptionUnion,
   UnitTransformRuleDescription
@@ -82,33 +83,16 @@ export class TransformationRuleService {
       }
 
       // Scale
-      transformationRuleDescription = transformationRuleDescription.concat(this.getCorrectionValueRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
+      transformationRuleDescription = transformationRuleDescription
+        .concat(this.getCorrectionValueRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getRenameRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getCreateNestedRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getMoveRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getDeleteRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getUnitTransformRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getTimestampTransformRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+        .concat(this.getDatatypeTransformRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
 
-      // Rename
-      transformationRuleDescription = transformationRuleDescription.concat(this.getRenameRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
-
-      // Create Nested
-      transformationRuleDescription = transformationRuleDescription.concat(this.getCreateNestedRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
-      // Move
-      transformationRuleDescription = transformationRuleDescription.concat(this.getMoveRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
-      // Delete
-      transformationRuleDescription = transformationRuleDescription.concat(this.getDeleteRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
-      // Unit
-      transformationRuleDescription = transformationRuleDescription.concat(this.getUnitTransformRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
-      // Timestmap
-      transformationRuleDescription = transformationRuleDescription.concat(this.getTimestampTransformRules(
-        this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
 
       return transformationRuleDescription;
     }
@@ -454,6 +438,40 @@ export class TransformationRuleService {
     return property.domainProperties.some(dp => dp === 'http://schema.org/DateTime');
   }
 
+  private getDatatypeTransformRules(eventProperties: EventPropertyUnion[],
+                                    oldEventSchema: EventSchema,
+                                    newEventSchema: EventSchema): ChangeDatatypeTransformationRuleDescription[] {
+
+    let result: ChangeDatatypeTransformationRuleDescription[] = [];
+
+    eventProperties.forEach(ep => {
+      if (ep instanceof EventPropertyPrimitive) {
+        const eventPropertyPrimitive = ep as EventPropertyPrimitive;
+        const newRuntimeType = ep.runtimeType;
+        const keyNew = this.getCompleteRuntimeNameKey(newEventSchema.eventProperties, eventPropertyPrimitive.elementId);
+        const oldProperty = this.getEventProperty(oldEventSchema.eventProperties, ep.elementId);
+        if (oldProperty) {
+          const oldRuntimeType = (oldProperty as EventPropertyPrimitive).runtimeType;
+          if (newRuntimeType !== oldRuntimeType) {
+            const rule: ChangeDatatypeTransformationRuleDescription = new ChangeDatatypeTransformationRuleDescription();
+            rule['@class'] = 'org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription';
+            rule.runtimeKey = keyNew;
+            rule.originalDatatypeXsd = oldRuntimeType;
+            rule.targetDatatypeXsd = newRuntimeType;
+
+            result.push(rule);
+          }
+        }
+
+      } else if (ep instanceof EventPropertyNested) {
+        const nestedResults = this.getDatatypeTransformRules((ep as EventPropertyNested).eventProperties, oldEventSchema, newEventSchema);
+        result = result.concat(nestedResults);
+      }
+    });
+
+    return result;
+  }
+
   private getCorrectionValueRules(eventProperties: EventPropertyUnion[],
                                   oldEventSchema: EventSchema,
                                   newEventSchema: EventSchema) {