You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/04/13 20:38:24 UTC
[incubator-streampipes-extensions] branch dev updated: Fix bug in
field converter
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 6fce621 Fix bug in field converter
new 3067682 Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
6fce621 is described below
commit 6fce621680c2aae2dbc6ac63f62dbc36babb36f2
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Apr 13 22:38:11 2020 +0200
Fix bug in field converter
---
.../flink/processor/converter/FieldConverter.java | 10 +-
.../converter/FieldConverterController.java | 102 +++++++++++++++++----
2 files changed, 93 insertions(+), 19 deletions(-)
diff --git a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java
index e94512b..c8778af 100644
--- a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java
+++ b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java
@@ -42,9 +42,13 @@ public class FieldConverter implements FlatMapFunction<Event, Event> {
String value = in.getFieldBySelector(convertProperty).getAsPrimitive().getAsString();
try {
if (targetDatatype.equals(XSD._float.toString())) {
- in.updateFieldBySelector(convertProperty, Float.parseFloat(value.trim()));
- } else {
- in.updateFieldBySelector(convertProperty, Integer.parseInt(value.trim()));
+ in.updateFieldBySelector(convertProperty, Float.parseFloat(value.trim()));
+ } else if (targetDatatype.equals(XSD._integer.toString())){
+ in.updateFieldBySelector(convertProperty, Integer.parseInt(value.trim()));
+ } else if (targetDatatype.equals(XSD._boolean.toString())) {
+ in.updateFieldBySelector(convertProperty, Boolean.parseBoolean(value.trim()));
+ } else if (targetDatatype.equals(XSD._string.toString())) {
+ in.updateFieldBySelector(convertProperty, value.trim());
}
out.collect(in);
diff --git a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
index 5354656..b94a80b 100644
--- a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
+++ b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
@@ -17,30 +17,42 @@
*/
package org.apache.streampipes.processors.transformation.flink.processor.converter;
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOutputStrategy;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.model.staticproperty.Option;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.Options;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.TransformOperations;
import org.apache.streampipes.sdk.helpers.Tuple2;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.vocabulary.XSD;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
public class FieldConverterController extends
- FlinkDataProcessorDeclarer<FieldConverterParameters> {
+ FlinkDataProcessorDeclarer<FieldConverterParameters>
+ implements ResolvesContainerProvidedOptions,
+ ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
public static final String CONVERT_PROPERTY = "convert-property";
public static final String TARGET_TYPE = "target-type";
- private static final String FIELD_TO_CONVERT_KEY = "fieldToConvert";
@Override
public DataProcessorDescription declareModel() {
@@ -49,20 +61,12 @@ public class FieldConverterController extends
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder
.create()
- .requiredProperty(EpRequirements.anyProperty())
-// .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.withId
-// (CONVERT_PROPERTY), PropertyScope.NONE)
+ .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(), Labels.withId
+ (CONVERT_PROPERTY), PropertyScope.NONE)
.build())
-
- .naryMappingPropertyWithoutRequirement(
- Labels.withId(FIELD_TO_CONVERT_KEY),
- PropertyScope.NONE)
-
- .requiredSingleValueSelection(Labels.withId(TARGET_TYPE), Options.from
- (new Tuple2<>("Float", XSD._float.toString()), new Tuple2<>
- ("Integer", XSD._integer.toString())))
- .outputStrategy(OutputStrategies.transform(TransformOperations
- .dynamicDatatypeTransformation(CONVERT_PROPERTY, TARGET_TYPE)))
+ .requiredSingleValueSelectionFromContainer(Labels.withId(TARGET_TYPE),
+ Collections.singletonList(CONVERT_PROPERTY))
+ .outputStrategy(OutputStrategies.customTransformation())
.build();
}
@@ -79,4 +83,70 @@ public class FieldConverterController extends
return new FieldConverterProgram(staticParams);
}
+
+ @Override
+ public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+ String fieldSelector = parameterExtractor.mappingPropertyValue(CONVERT_PROPERTY);
+ try {
+ EventProperty property = parameterExtractor.getEventPropertyBySelector(fieldSelector);
+ if (property instanceof EventPropertyPrimitive) {
+ String runtimeType = ((EventPropertyPrimitive) property).getRuntimeType();
+ if (runtimeType.equals(XSD._string.toString())) {
+ return Options.from(floatValue(), integerValue(), booleanValue());
+ } else if (runtimeType.equals(XSD._integer.toString())) {
+ return Options.from(floatValue(), stringValue(), booleanValue());
+ } else if (runtimeType.equals(XSD._float.toString()) || runtimeType.equals(XSD._double.toString())) {
+ return Options.from(integerValue(), stringValue());
+ } else if (runtimeType.equals(XSD._boolean.toString())) {
+ return Options.from(integerValue(), stringValue());
+ } else {
+ return Options.from(stringValue());
+ }
+ } else {
+ return Options.from(stringValue());
+ }
+
+ } catch (SpRuntimeException e) {
+ e.printStackTrace();
+ return Options.from(stringValue());
+ }
+ }
+
+ @Override
+ public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, ProcessingElementParameterExtractor parameterExtractor) throws SpRuntimeException {
+ EventSchema eventSchema = processingElement.getInputStreams().get(0).getEventSchema();
+ String fieldSelector = parameterExtractor.mappingPropertyValue(CONVERT_PROPERTY);
+ String convertedType = parameterExtractor.selectedSingleValueInternalName(TARGET_TYPE,
+ String.class);
+ EventProperty property = parameterExtractor.getEventPropertyBySelector(fieldSelector);
+ List<EventProperty> outputProperties = new ArrayList<>();
+ if (property instanceof EventPropertyPrimitive) {
+ ((EventPropertyPrimitive) property).setRuntimeType(convertedType);
+ }
+ eventSchema.getEventProperties().forEach(ep -> {
+ if (ep.getRuntimeName().equals(property.getRuntimeName())) {
+ outputProperties.add(property);
+ } else {
+ outputProperties.add(ep);
+ }
+ });
+
+ return new EventSchema(outputProperties);
+ }
+
+ private Tuple2<String, String> floatValue() {
+ return new Tuple2<>("Float", XSD._float.toString());
+ }
+
+ private Tuple2<String, String> integerValue() {
+ return new Tuple2<>("Integer", XSD._integer.toString());
+ }
+
+ private Tuple2<String, String> booleanValue() {
+ return new Tuple2<>("Boolean", XSD._boolean.toString());
+ }
+
+ private Tuple2<String, String> stringValue() {
+ return new Tuple2<>("String", XSD._string.toString());
+ }
}