You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/08/18 20:33:17 UTC
[incubator-streampipes] 11/15: [STREAMPIPES-577] Improve parsing and preview of datatypes
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch rel/0.70.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit aea54f13d641c60d897e57466ff8f90e085eed57
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Wed Aug 17 17:03:18 2022 +0200
[STREAMPIPES-577] Improve parsing and preview of datatypes
---
.../connect/adapter/AdapterPipelineGenerator.java | 22 ++---
.../connect/adapter/format/csv/CsvParser.java | 37 +------
.../TransformValueAdapterPipelineElement.java | 14 ++-
.../value/DatatypeTransformationRule.java | 52 ++++++++++
.../transform/value/ValueEventTransformer.java | 46 +++------
.../connect/adapter/util/DatatypeUtils.java | 108 +++++++++++++++++++++
.../connect/adapter/util/PollingSettings.java | 1 +
.../rules/TransformationRuleDescription.java | 1 +
...hangeDatatypeTransformationRuleDescription.java | 60 ++++++++++++
.../src/lib/model/gen/streampipes-model.ts | 39 ++++++--
.../services/transformation-rule.service.ts | 70 ++++++++-----
11 files changed, 328 insertions(+), 122 deletions(-)
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java
index 31ce0e9a1..dfe84a105 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/AdapterPipelineGenerator.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.connect.adapter;
import org.apache.streampipes.config.backend.BackendConfig;
-import org.apache.streampipes.config.backend.SpProtocol;
import org.apache.streampipes.connect.adapter.model.pipeline.AdapterPipeline;
import org.apache.streampipes.connect.adapter.preprocessing.elements.*;
import org.apache.streampipes.connect.adapter.preprocessing.transform.stream.DuplicateFilterPipelineElement;
@@ -29,10 +28,7 @@ import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.schema.SchemaTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.stream.EventRateTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.stream.RemoveDuplicatesTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
-import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.*;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
@@ -47,13 +43,13 @@ public class AdapterPipelineGenerator {
var pipelineElements = makeAdapterPipelineElements(adapterDescription.getRules());
- RemoveDuplicatesTransformationRuleDescription duplicatesTransformationRuleDescription = getRemoveDuplicateRule(adapterDescription.getRules());
+ var duplicatesTransformationRuleDescription = getRemoveDuplicateRule(adapterDescription.getRules());
if (duplicatesTransformationRuleDescription != null) {
pipelineElements.add(new DuplicateFilterPipelineElement(duplicatesTransformationRuleDescription.getFilterTimeWindow()));
}
- TransformStreamAdapterElement transformStreamAdapterElement = new TransformStreamAdapterElement();
- EventRateTransformationRuleDescription eventRateTransformationRuleDescription = getEventRateTransformationRule(adapterDescription.getRules());
+ var transformStreamAdapterElement = new TransformStreamAdapterElement();
+ var eventRateTransformationRuleDescription = getEventRateTransformationRule(adapterDescription.getRules());
if (eventRateTransformationRuleDescription != null) {
transformStreamAdapterElement.addStreamTransformationRuleDescription(eventRateTransformationRuleDescription);
}
@@ -73,13 +69,13 @@ public class AdapterPipelineGenerator {
List<IAdapterPipelineElement> pipelineElements = new ArrayList<>();
// Must be before the schema transformations to ensure that user can move this event property
- AddTimestampRuleDescription timestampTransformationRuleDescription = getTimestampRule(rules);
+ var timestampTransformationRuleDescription = getTimestampRule(rules);
if (timestampTransformationRuleDescription != null) {
pipelineElements.add(new AddTimestampPipelineElement(
timestampTransformationRuleDescription.getRuntimeKey()));
}
- AddValueTransformationRuleDescription valueTransformationRuleDescription = getAddValueRule(rules);
+ var valueTransformationRuleDescription = getAddValueRule(rules);
if (valueTransformationRuleDescription != null) {
pipelineElements.add(new AddValuePipelineElement(
valueTransformationRuleDescription.getRuntimeKey(),
@@ -95,7 +91,7 @@ public class AdapterPipelineGenerator {
}
private SendToBrokerAdapterSink<?> getAdapterSink(AdapterDescription adapterDescription) {
- SpProtocol prioritizedProtocol =
+ var prioritizedProtocol =
BackendConfig.INSTANCE.getMessagingSettings().getPrioritizedProtocols().get(0);
if (GroundingService.isPrioritized(prioritizedProtocol, JmsTransportProtocol.class)) {
@@ -125,10 +121,6 @@ public class AdapterPipelineGenerator {
return getRule(rules, AddValueTransformationRuleDescription.class);
}
- private CorrectionValueTransformationRuleDescription getCorrectionValueRule(List<TransformationRuleDescription> rules) {
- return getRule(rules, CorrectionValueTransformationRuleDescription.class);
- }
-
private <G extends TransformationRuleDescription> G getRule(List<TransformationRuleDescription> rules,
Class<G> type) {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
index a5e4f3841..812c67b7f 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/format/csv/CsvParser.java
@@ -21,6 +21,7 @@ package org.apache.streampipes.connect.adapter.format.csv;
import org.apache.streampipes.connect.adapter.model.generic.Parser;
import org.apache.streampipes.connect.adapter.sdk.ParameterExtractor;
+import org.apache.streampipes.connect.adapter.util.DatatypeUtils;
import org.apache.streampipes.connect.api.EmitBinaryEvent;
import org.apache.streampipes.connect.api.exception.ParseException;
import org.apache.streampipes.model.connect.grounding.FormatDescription;
@@ -28,7 +29,6 @@ import org.apache.streampipes.model.connect.guess.AdapterGuessInfo;
import org.apache.streampipes.model.connect.guess.GuessTypeInfo;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
-import org.apache.streampipes.vocabulary.XSD;
import java.io.BufferedReader;
import java.io.IOException;
@@ -108,9 +108,11 @@ public class CsvParser extends Parser {
EventSchema resultSchema = new EventSchema();
for (int i = 0; i < keys.length; i++) {
EventPropertyPrimitive p = new EventPropertyPrimitive();
+ var runtimeType = DatatypeUtils.getXsdDatatype(data[i]);
+ var convertedValue = DatatypeUtils.convertValue(data[i], runtimeType);
p.setRuntimeName(keys[i]);
- p.setRuntimeType(getTypeString(data[i]));
- sample.put(keys[i], new GuessTypeInfo(getTypeString(data[i]), data[i]));
+ p.setRuntimeType(runtimeType);
+ sample.put(keys[i], new GuessTypeInfo(DatatypeUtils.getCanonicalTypeClassName(data[i]), convertedValue));
resultSchema.addEventProperty(p);
}
@@ -122,35 +124,6 @@ public class CsvParser extends Parser {
return getSchemaAndSample(oneEvent).getEventSchema();
}
- private String getTypeString(String o) {
-
- String typeClass = getTypeClass(o);
-
- if (Float.class.getCanonicalName().equals(typeClass)) {
- return XSD._float.toString();
- } else if (Boolean.class.getCanonicalName().equals(typeClass)) {
- return XSD._boolean.toString();
- } else {
- return XSD._string.toString();
- }
- }
-
- private String getTypeClass(String o) {
-
- try {
- Double.parseDouble(o);
- return Float.class.getCanonicalName();
- } catch (NumberFormatException e) {
-
- }
-
- if (o.equalsIgnoreCase("true") || o.equalsIgnoreCase("false")) {
- return Boolean.class.getCanonicalName();
- }
-
- return String.class.getCanonicalName();
- }
-
public static String[] parseLine(String cvsLine, String separatorString) {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
index 25d357518..abb1bd27a 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/elements/TransformValueAdapterPipelineElement.java
@@ -22,6 +22,7 @@ import org.apache.streampipes.connect.adapter.preprocessing.Util;
import org.apache.streampipes.connect.adapter.preprocessing.transform.value.*;
import org.apache.streampipes.connect.api.IAdapterPipelineElement;
import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
+import org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription;
@@ -43,11 +44,11 @@ public class TransformValueAdapterPipelineElement implements IAdapterPipelineEle
// transforms description to actual rules
for (TransformationRuleDescription ruleDescription : transformationRuleDescriptions) {
if (ruleDescription instanceof UnitTransformRuleDescription) {
- UnitTransformRuleDescription tmp = (UnitTransformRuleDescription) ruleDescription;
+ var tmp = (UnitTransformRuleDescription) ruleDescription;
rules.add(new UnitTransformationRule(Util.toKeyArray(tmp.getRuntimeKey()),
tmp.getFromUnitRessourceURL(), tmp.getToUnitRessourceURL()));
- } else if(ruleDescription instanceof TimestampTranfsformationRuleDescription) {
- TimestampTranfsformationRuleDescription tmp = (TimestampTranfsformationRuleDescription) ruleDescription;
+ } else if (ruleDescription instanceof TimestampTranfsformationRuleDescription) {
+ var tmp = (TimestampTranfsformationRuleDescription) ruleDescription;
TimestampTranformationRuleMode mode = null;
switch (tmp.getMode()) {
case "formatString": mode = TimestampTranformationRuleMode.FORMAT_STRING;
@@ -57,9 +58,12 @@ public class TransformValueAdapterPipelineElement implements IAdapterPipelineEle
rules.add(new TimestampTranformationRule(Util.toKeyArray(tmp.getRuntimeKey()), mode,
tmp.getFormatString(), tmp.getMultiplier()));
}
- else if(ruleDescription instanceof CorrectionValueTransformationRuleDescription) {
- CorrectionValueTransformationRuleDescription tmp = (CorrectionValueTransformationRuleDescription) ruleDescription;
+ else if (ruleDescription instanceof CorrectionValueTransformationRuleDescription) {
+ var tmp = (CorrectionValueTransformationRuleDescription) ruleDescription;
rules.add(new CorrectionValueTransformationRule(Util.toKeyArray(tmp.getRuntimeKey()), tmp.getCorrectionValue(), tmp.getOperator()));
+ } else if (ruleDescription instanceof ChangeDatatypeTransformationRuleDescription) {
+ var tmp = (ChangeDatatypeTransformationRuleDescription) ruleDescription;
+ rules.add(new DatatypeTransformationRule(tmp.getRuntimeKey(), tmp.getOriginalDatatypeXsd(), tmp.getTargetDatatypeXsd()));
}
else {
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/DatatypeTransformationRule.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/DatatypeTransformationRule.java
new file mode 100644
index 000000000..d8c6d8aac
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/DatatypeTransformationRule.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapter.preprocessing.transform.value;
+
+import org.apache.streampipes.connect.adapter.util.DatatypeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class DatatypeTransformationRule implements ValueTransformationRule {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DatatypeTransformationRule.class);
+
+ private String eventKey;
+ private String originalDatatypeXsd;
+ private String targetDatatypeXsd;
+
+ public DatatypeTransformationRule(String eventKey, String originalDatatypeXsd, String targetDatatypeXsd) {
+ this.eventKey = eventKey;
+ this.originalDatatypeXsd = originalDatatypeXsd;
+ this.targetDatatypeXsd = targetDatatypeXsd;
+ }
+
+ @Override
+ public Map<String, Object> transform(Map<String, Object> event) {
+ Object value = event.get(eventKey);
+ Object transformedValue = transformDatatype(value);
+ event.put(eventKey, transformedValue);
+ return event;
+ }
+
+ public Object transformDatatype(Object value) {
+ return DatatypeUtils.convertValue(value, targetDatatypeXsd);
+ }
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
index 939cbad78..c965c894d 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/preprocessing/transform/value/ValueEventTransformer.java
@@ -26,14 +26,16 @@ import java.util.Map;
public class ValueEventTransformer implements ValueTransformationRule {
- private List<UnitTransformationRule> unitTransformationRules;
- private List<TimestampTranformationRule> timestampTransformationRules;
- private List<CorrectionValueTransformationRule> correctionValueTransformationRules;
+ private final List<UnitTransformationRule> unitTransformationRules;
+ private final List<TimestampTranformationRule> timestampTransformationRules;
+ private final List<CorrectionValueTransformationRule> correctionValueTransformationRules;
+ private final List<DatatypeTransformationRule> datatypeTransformationRules;
public ValueEventTransformer(List<ValueTransformationRule> rules) {
this.unitTransformationRules = new ArrayList<>();
this.timestampTransformationRules = new ArrayList<>();
this.correctionValueTransformationRules = new ArrayList<>();
+ this.datatypeTransformationRules = new ArrayList<>();
for (TransformationRule rule : rules) {
if (rule instanceof UnitTransformationRule) {
@@ -42,16 +44,12 @@ public class ValueEventTransformer implements ValueTransformationRule {
this.timestampTransformationRules.add((TimestampTranformationRule) rule);
} else if (rule instanceof CorrectionValueTransformationRule) {
this.correctionValueTransformationRules.add((CorrectionValueTransformationRule) rule);
+ } else if (rule instanceof DatatypeTransformationRule) {
+ this.datatypeTransformationRules.add((DatatypeTransformationRule) rule);
}
}
}
-/*
- public ValueEventTransformer(List<UnitTransformationRule> unitTransformationRule) {
- this.unitTransformationRules = new ArrayList<>();
- }
-*/
-
@Override
public Map<String, Object> transform(Map<String, Object> event) {
@@ -63,36 +61,14 @@ public class ValueEventTransformer implements ValueTransformationRule {
event = rule.transform(event);
}
- for (CorrectionValueTransformationRule rule : correctionValueTransformationRules) {
+ for (var rule: datatypeTransformationRules) {
event = rule.transform(event);
}
+ for (CorrectionValueTransformationRule rule : correctionValueTransformationRules) {
+ event = rule.transform(event);
+ }
return event;
}
-
-
- public List<UnitTransformationRule> getUnitTransformationRules() {
- return unitTransformationRules;
- }
-
- public void setUnitTransformationRules(List<UnitTransformationRule> unitTransformationRules) {
- this.unitTransformationRules = unitTransformationRules;
- }
-
- public List<TimestampTranformationRule> getTimestampTransformationRules() {
- return timestampTransformationRules;
- }
-
- public void setTimestampTransformationRules(List<TimestampTranformationRule> timestampTransformationRules) {
- this.timestampTransformationRules = timestampTransformationRules;
- }
-
- public List<CorrectionValueTransformationRule> getCorrectionValueTransformationRules() {
- return correctionValueTransformationRules;
- }
-
- public void setCorrectionValueTransformationRules(List<CorrectionValueTransformationRule> correctionValueTransformationRules) {
- this.correctionValueTransformationRules = correctionValueTransformationRules;
- }
}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
new file mode 100644
index 000000000..9261beee3
--- /dev/null
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/DatatypeUtils.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.connect.adapter.util;
+
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.streampipes.vocabulary.XSD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatatypeUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DatatypeUtils.class);
+
+ public static Object convertValue(Object value,
+ String targetDatatypeXsd) {
+ var stringValue = String.valueOf(value);
+ if (XSD._string.toString().equals(targetDatatypeXsd)) {
+ return stringValue;
+ } else {
+ try {
+ if (XSD._double.toString().equals(targetDatatypeXsd)) {
+ return Double.parseDouble(stringValue);
+ } else if (XSD._float.toString().equals(targetDatatypeXsd)) {
+ return Float.parseFloat(stringValue);
+ } else if (XSD._boolean.toString().equals(targetDatatypeXsd)) {
+ return Boolean.parseBoolean(stringValue);
+ } else if (XSD._integer.toString().equals(targetDatatypeXsd)) {
+ var floatingNumber = Float.parseFloat(stringValue);
+ return Math.round(floatingNumber);
+ } else if (XSD._long.toString().equals(targetDatatypeXsd)) {
+ var floatingNumber = Double.parseDouble(stringValue);
+ return Math.round(floatingNumber);
+ }
+ } catch (NumberFormatException e) {
+ LOG.error("Number format exception {}", value);
+ return value;
+ }
+ }
+
+ return value;
+ }
+
+ public static String getCanonicalTypeClassName(String value) {
+ return getTypeClass(value).getCanonicalName();
+ }
+
+ public static String getXsdDatatype(String value) {
+ var clazz = getTypeClass(value);
+ if (clazz.equals(Integer.class)) {
+ return XSD._integer.toString();
+ } else if (clazz.equals(Long.class)) {
+ return XSD._long.toString();
+ } else if (clazz.equals(Float.class)) {
+ return XSD._float.toString();
+ } else if (clazz.equals(Double.class)) {
+ return XSD._double.toString();
+ } else if (clazz.equals(Boolean.class)) {
+ return XSD._boolean.toString();
+ } else {
+ return XSD._string.toString();
+ }
+ }
+
+ public static Class<?> getTypeClass(String value) {
+ if (NumberUtils.isParsable(value)) {
+ try {
+ Integer.parseInt(value);
+ return Integer.class;
+ } catch (NumberFormatException ignored) {
+ }
+
+ try {
+ Long.parseLong(value);
+ return Long.class;
+ } catch (NumberFormatException ignored) {
+ }
+
+ try {
+ Double.parseDouble(value);
+ return Float.class;
+ } catch (NumberFormatException ignored) {
+ }
+
+ }
+
+ if (value.equalsIgnoreCase("true") || value.equalsIgnoreCase("false")) {
+ return Boolean.class;
+ }
+
+ return String.class;
+ }
+}
diff --git a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java
index 79dea5b0e..177702c82 100644
--- a/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java
+++ b/streampipes-connect/src/main/java/org/apache/streampipes/connect/adapter/util/PollingSettings.java
@@ -15,6 +15,7 @@
* limitations under the License.
*
*/
+
package org.apache.streampipes.connect.adapter.util;
import java.util.concurrent.TimeUnit;
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java
index bd1b85b5e..590b4161e 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/TransformationRuleDescription.java
@@ -41,6 +41,7 @@ import org.apache.streampipes.model.shared.annotation.TsModel;
@JsonSubTypes.Type(DeleteRuleDescription.class),
@JsonSubTypes.Type(RenameRuleDescription.class),
@JsonSubTypes.Type(MoveRuleDescription.class),
+ @JsonSubTypes.Type(ChangeDatatypeTransformationRuleDescription.class),
@JsonSubTypes.Type(CorrectionValueTransformationRuleDescription.class),
})
public abstract class TransformationRuleDescription extends UnnamedStreamPipesEntity {
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/ChangeDatatypeTransformationRuleDescription.java b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/ChangeDatatypeTransformationRuleDescription.java
new file mode 100644
index 000000000..aa64e647b
--- /dev/null
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/connect/rules/value/ChangeDatatypeTransformationRuleDescription.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.model.connect.rules.value;
+
+public class ChangeDatatypeTransformationRuleDescription extends ValueTransformationRuleDescription {
+
+ private String runtimeKey;
+ private String originalDatatypeXsd;
+ private String targetDatatypeXsd;
+
+ public ChangeDatatypeTransformationRuleDescription() {
+ }
+
+ public ChangeDatatypeTransformationRuleDescription(ChangeDatatypeTransformationRuleDescription other) {
+ super(other);
+ this.runtimeKey = other.getRuntimeKey();
+ this.originalDatatypeXsd = other.getOriginalDatatypeXsd();
+ this.targetDatatypeXsd = other.getTargetDatatypeXsd();
+ }
+
+ public String getRuntimeKey() {
+ return runtimeKey;
+ }
+
+ public void setRuntimeKey(String runtimeKey) {
+ this.runtimeKey = runtimeKey;
+ }
+
+ public String getOriginalDatatypeXsd() {
+ return originalDatatypeXsd;
+ }
+
+ public void setOriginalDatatypeXsd(String originalDatatypeXsd) {
+ this.originalDatatypeXsd = originalDatatypeXsd;
+ }
+
+ public String getTargetDatatypeXsd() {
+ return targetDatatypeXsd;
+ }
+
+ public void setTargetDatatypeXsd(String targetDatatypeXsd) {
+ this.targetDatatypeXsd = targetDatatypeXsd;
+ }
+}
diff --git a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
index d2712a990..88c0b73ca 100644
--- a/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
+++ b/ui/projects/streampipes/platform-services/src/lib/model/gen/streampipes-model.ts
@@ -18,10 +18,10 @@
/* tslint:disable */
/* eslint-disable */
// @ts-nocheck
-// Generated using typescript-generator version 2.27.744 on 2022-08-16 16:51:03.
+// Generated using typescript-generator version 2.27.744 on 2022-08-17 14:48:34.
export class AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
+ "@class": "org.apache.streampipes.model.base.AbstractStreamPipesEntity" | "org.apache.streampipes.model.base.NamedStreamPipesEntity" | "org.apache.streampipes.model.connect.adapter.AdapterDescription" | "org.apache.streampipes.model.connect.adapter.AdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.SpecificAdapterSetDescription" | "org.apache.streampipes.model.connect.adapter.AdapterStre [...]
elementId: string;
static fromData(data: AbstractStreamPipesEntity, target?: AbstractStreamPipesEntity): AbstractStreamPipesEntity {
@@ -36,7 +36,7 @@ export class AbstractStreamPipesEntity {
}
export class UnnamedStreamPipesEntity extends AbstractStreamPipesEntity {
- "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
+ "@class": "org.apache.streampipes.model.base.UnnamedStreamPipesEntity" | "org.apache.streampipes.model.connect.guess.GuessSchema" | "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streamp [...]
static fromData(data: UnnamedStreamPipesEntity, target?: UnnamedStreamPipesEntity): UnnamedStreamPipesEntity {
if (!data) {
@@ -151,8 +151,8 @@ export class NamedStreamPipesEntity extends AbstractStreamPipesEntity {
instance.applicationLinks = __getCopyArrayFn(ApplicationLink.fromData)(data.applicationLinks);
instance.internallyManaged = data.internallyManaged;
instance.connectedTo = __getCopyArrayFn(__identity<string>())(data.connectedTo);
- instance.dom = data.dom;
instance.uri = data.uri;
+ instance.dom = data.dom;
instance._rev = data._rev;
return instance;
}
@@ -192,9 +192,9 @@ export class AdapterDescription extends NamedStreamPipesEntity {
instance.selectedEndpointUrl = data.selectedEndpointUrl;
instance.correspondingServiceGroup = data.correspondingServiceGroup;
instance.correspondingDataStreamElementId = data.correspondingDataStreamElementId;
- instance.schemaRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.schemaRules);
instance.streamRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.streamRules);
instance.valueRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.valueRules);
+ instance.schemaRules = __getCopyArrayFn(TransformationRuleDescription.fromDataUnion)(data.schemaRules);
return instance;
}
@@ -304,7 +304,7 @@ export class AdapterType {
}
export class TransformationRuleDescription extends UnnamedStreamPipesEntity {
- "@class": "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.valu [...]
+ "@class": "org.apache.streampipes.model.connect.rules.TransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.valu [...]
static fromData(data: TransformationRuleDescription, target?: TransformationRuleDescription): TransformationRuleDescription {
if (!data) {
@@ -340,6 +340,8 @@ export class TransformationRuleDescription extends UnnamedStreamPipesEntity {
return RenameRuleDescription.fromData(data);
case "org.apache.streampipes.model.connect.rules.schema.MoveRuleDescription":
return MoveRuleDescription.fromData(data);
+ case "org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription":
+ return ChangeDatatypeTransformationRuleDescription.fromData(data);
case "org.apache.streampipes.model.connect.rules.value.CorrectionValueTransformationRuleDescription":
return CorrectionValueTransformationRuleDescription.fromData(data);
}
@@ -347,7 +349,7 @@ export class TransformationRuleDescription extends UnnamedStreamPipesEntity {
}
export class ValueTransformationRuleDescription extends TransformationRuleDescription {
- "@class": "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription" | "org.apache.streampipes.model.connect.rules [...]
+ "@class": "org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription" | "org.apache.streampipes.model.connect.rules.value.AddValueTransformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.TimestampTranfsformationRuleDescription" | "org.apache.streampipes.model.connect.rules.value.UnitTransformRuleDescription" | "org.apache.streampipes.model.connect.rules [...]
static fromData(data: ValueTransformationRuleDescription, target?: ValueTransformationRuleDescription): ValueTransformationRuleDescription {
if (!data) {
@@ -689,6 +691,25 @@ export class Category {
}
}
+export class ChangeDatatypeTransformationRuleDescription extends ValueTransformationRuleDescription {
+ "@class": "org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription";
+ originalDatatypeXsd: string;
+ runtimeKey: string;
+ targetDatatypeXsd: string;
+
+ static fromData(data: ChangeDatatypeTransformationRuleDescription, target?: ChangeDatatypeTransformationRuleDescription): ChangeDatatypeTransformationRuleDescription {
+ if (!data) {
+ return data;
+ }
+ const instance = target || new ChangeDatatypeTransformationRuleDescription();
+ super.fromData(data, instance);
+ instance.runtimeKey = data.runtimeKey;
+ instance.originalDatatypeXsd = data.originalDatatypeXsd;
+ instance.targetDatatypeXsd = data.targetDatatypeXsd;
+ return instance;
+ }
+}
+
export class CodeInputStaticProperty extends StaticProperty {
"@class": "org.apache.streampipes.model.staticproperty.CodeInputStaticProperty";
codeTemplate: string;
@@ -2618,9 +2639,9 @@ export class PipelineTemplateDescription extends NamedStreamPipesEntity {
const instance = target || new PipelineTemplateDescription();
super.fromData(data, instance);
instance.boundTo = __getCopyArrayFn(BoundPipelineElement.fromData)(data.boundTo);
- instance.pipelineTemplateName = data.pipelineTemplateName;
instance.pipelineTemplateId = data.pipelineTemplateId;
instance.pipelineTemplateDescription = data.pipelineTemplateDescription;
+ instance.pipelineTemplateName = data.pipelineTemplateName;
return instance;
}
}
@@ -3461,7 +3482,7 @@ export type StreamTransformationRuleDescriptionUnion = EventRateTransformationRu
export type TopicDefinitionUnion = SimpleTopicDefinition | WildcardTopicDefinition;
-export type TransformationRuleDescriptionUnion = AddTimestampRuleDescription | AddValueTransformationRuleDescription | TimestampTranfsformationRuleDescription | UnitTransformRuleDescription | EventRateTransformationRuleDescription | RemoveDuplicatesTransformationRuleDescription | CreateNestedRuleDescription | DeleteRuleDescription | RenameRuleDescription | MoveRuleDescription | CorrectionValueTransformationRuleDescription;
+export type TransformationRuleDescriptionUnion = AddTimestampRuleDescription | AddValueTransformationRuleDescription | TimestampTranfsformationRuleDescription | UnitTransformRuleDescription | EventRateTransformationRuleDescription | RemoveDuplicatesTransformationRuleDescription | CreateNestedRuleDescription | DeleteRuleDescription | RenameRuleDescription | MoveRuleDescription | ChangeDatatypeTransformationRuleDescription | CorrectionValueTransformationRuleDescription;
export type TransportProtocolUnion = JmsTransportProtocol | KafkaTransportProtocol | MqttTransportProtocol;
diff --git a/ui/src/app/connect/services/transformation-rule.service.ts b/ui/src/app/connect/services/transformation-rule.service.ts
index 87819561a..478669698 100644
--- a/ui/src/app/connect/services/transformation-rule.service.ts
+++ b/ui/src/app/connect/services/transformation-rule.service.ts
@@ -30,6 +30,7 @@ import {
EventSchema,
MoveRuleDescription,
RenameRuleDescription,
+ ChangeDatatypeTransformationRuleDescription,
TimestampTranfsformationRuleDescription,
TransformationRuleDescriptionUnion,
UnitTransformRuleDescription
@@ -82,33 +83,16 @@ export class TransformationRuleService {
}
// Scale
- transformationRuleDescription = transformationRuleDescription.concat(this.getCorrectionValueRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
+ transformationRuleDescription = transformationRuleDescription
+ .concat(this.getCorrectionValueRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getRenameRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getCreateNestedRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getMoveRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getDeleteRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getUnitTransformRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getTimestampTransformRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema))
+ .concat(this.getDatatypeTransformRules(this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
- // Rename
- transformationRuleDescription = transformationRuleDescription.concat(this.getRenameRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
-
- // Create Nested
- transformationRuleDescription = transformationRuleDescription.concat(this.getCreateNestedRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
- // Move
- transformationRuleDescription = transformationRuleDescription.concat(this.getMoveRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
- // Delete
- transformationRuleDescription = transformationRuleDescription.concat(this.getDeleteRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
- // Unit
- transformationRuleDescription = transformationRuleDescription.concat(this.getUnitTransformRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
-
- // Timestmap
- transformationRuleDescription = transformationRuleDescription.concat(this.getTimestampTransformRules(
- this.newEventSchema.eventProperties, this.oldEventSchema, this.newEventSchema));
return transformationRuleDescription;
}
@@ -454,6 +438,40 @@ export class TransformationRuleService {
return property.domainProperties.some(dp => dp === 'http://schema.org/DateTime');
}
+ private getDatatypeTransformRules(eventProperties: EventPropertyUnion[],
+ oldEventSchema: EventSchema,
+ newEventSchema: EventSchema): ChangeDatatypeTransformationRuleDescription[] {
+
+ let result: ChangeDatatypeTransformationRuleDescription[] = [];
+
+ eventProperties.forEach(ep => {
+ if (ep instanceof EventPropertyPrimitive) {
+ const eventPropertyPrimitive = ep as EventPropertyPrimitive;
+ const newRuntimeType = ep.runtimeType;
+ const keyNew = this.getCompleteRuntimeNameKey(newEventSchema.eventProperties, eventPropertyPrimitive.elementId);
+ const oldProperty = this.getEventProperty(oldEventSchema.eventProperties, ep.elementId);
+ if (oldProperty) {
+ const oldRuntimeType = (oldProperty as EventPropertyPrimitive).runtimeType;
+ if (newRuntimeType !== oldRuntimeType) {
+ const rule: ChangeDatatypeTransformationRuleDescription = new ChangeDatatypeTransformationRuleDescription();
+ rule['@class'] = 'org.apache.streampipes.model.connect.rules.value.ChangeDatatypeTransformationRuleDescription';
+ rule.runtimeKey = keyNew;
+ rule.originalDatatypeXsd = oldRuntimeType;
+ rule.targetDatatypeXsd = newRuntimeType;
+
+ result.push(rule);
+ }
+ }
+
+ } else if (ep instanceof EventPropertyNested) {
+ const nestedResults = this.getDatatypeTransformRules((ep as EventPropertyNested).eventProperties, oldEventSchema, newEventSchema);
+ result = result.concat(nestedResults);
+ }
+ });
+
+ return result;
+ }
+
private getCorrectionValueRules(eventProperties: EventPropertyUnion[],
oldEventSchema: EventSchema,
newEventSchema: EventSchema) {