You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/04/13 20:38:24 UTC

[incubator-streampipes-extensions] branch dev updated: Fix bug in field converter

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6fce621  Fix bug in field converter
     new 3067682  Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
6fce621 is described below

commit 6fce621680c2aae2dbc6ac63f62dbc36babb36f2
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Mon Apr 13 22:38:11 2020 +0200

    Fix bug in field converter
---
 .../flink/processor/converter/FieldConverter.java  |  10 +-
 .../converter/FieldConverterController.java        | 102 +++++++++++++++++----
 2 files changed, 93 insertions(+), 19 deletions(-)

diff --git a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java
index e94512b..c8778af 100644
--- a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java
+++ b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverter.java
@@ -42,9 +42,13 @@ public class FieldConverter implements FlatMapFunction<Event, Event> {
       String value = in.getFieldBySelector(convertProperty).getAsPrimitive().getAsString();
       try {
           if (targetDatatype.equals(XSD._float.toString())) {
-              in.updateFieldBySelector(convertProperty, Float.parseFloat(value.trim()));
-          } else {
-              in.updateFieldBySelector(convertProperty, Integer.parseInt(value.trim()));
+            in.updateFieldBySelector(convertProperty, Float.parseFloat(value.trim()));
+          } else if (targetDatatype.equals(XSD._integer.toString())){
+            in.updateFieldBySelector(convertProperty, Integer.parseInt(value.trim()));
+          } else if (targetDatatype.equals(XSD._boolean.toString())) {
+            in.updateFieldBySelector(convertProperty, Boolean.parseBoolean(value.trim()));
+          } else if (targetDatatype.equals(XSD._string.toString())) {
+            in.updateFieldBySelector(convertProperty, value.trim());
           }
           
           out.collect(in);
diff --git a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
index 5354656..b94a80b 100644
--- a/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
+++ b/streampipes-processors-transformation-flink/src/main/java/org/apache/streampipes/processors/transformation/flink/processor/converter/FieldConverterController.java
@@ -17,30 +17,42 @@
  */
 package org.apache.streampipes.processors.transformation.flink.processor.converter;
 
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOptions;
+import org.apache.streampipes.container.api.ResolvesContainerProvidedOutputStrategy;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
+import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
+import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.model.staticproperty.Option;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
+import org.apache.streampipes.sdk.extractor.StaticPropertyExtractor;
 import org.apache.streampipes.sdk.helpers.EpRequirements;
 import org.apache.streampipes.sdk.helpers.Labels;
 import org.apache.streampipes.sdk.helpers.Locales;
 import org.apache.streampipes.sdk.helpers.Options;
 import org.apache.streampipes.sdk.helpers.OutputStrategies;
-import org.apache.streampipes.sdk.helpers.TransformOperations;
 import org.apache.streampipes.sdk.helpers.Tuple2;
 import org.apache.streampipes.sdk.utils.Assets;
 import org.apache.streampipes.vocabulary.XSD;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class FieldConverterController extends
-        FlinkDataProcessorDeclarer<FieldConverterParameters> {
+        FlinkDataProcessorDeclarer<FieldConverterParameters>
+        implements ResolvesContainerProvidedOptions,
+        ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
 
   public static final String CONVERT_PROPERTY = "convert-property";
   public static final String TARGET_TYPE = "target-type";
-  private static final String FIELD_TO_CONVERT_KEY = "fieldToConvert";
 
   @Override
   public DataProcessorDescription declareModel() {
@@ -49,20 +61,12 @@ public class FieldConverterController extends
             .withAssets(Assets.DOCUMENTATION, Assets.ICON)
             .requiredStream(StreamRequirementsBuilder
                     .create()
-                    .requiredProperty(EpRequirements.anyProperty())
-//                    .requiredPropertyWithUnaryMapping(EpRequirements.stringReq(), Labels.withId
-//                            (CONVERT_PROPERTY), PropertyScope.NONE)
+                    .requiredPropertyWithUnaryMapping(EpRequirements.anyProperty(), Labels.withId
+                            (CONVERT_PROPERTY), PropertyScope.NONE)
                     .build())
-
-            .naryMappingPropertyWithoutRequirement(
-                    Labels.withId(FIELD_TO_CONVERT_KEY),
-                    PropertyScope.NONE)
-
-            .requiredSingleValueSelection(Labels.withId(TARGET_TYPE), Options.from
-                    (new Tuple2<>("Float", XSD._float.toString()), new Tuple2<>
-                            ("Integer", XSD._integer.toString())))
-            .outputStrategy(OutputStrategies.transform(TransformOperations
-                    .dynamicDatatypeTransformation(CONVERT_PROPERTY, TARGET_TYPE)))
+            .requiredSingleValueSelectionFromContainer(Labels.withId(TARGET_TYPE),
+                    Collections.singletonList(CONVERT_PROPERTY))
+            .outputStrategy(OutputStrategies.customTransformation())
             .build();
   }
 
@@ -79,4 +83,70 @@ public class FieldConverterController extends
 
     return new FieldConverterProgram(staticParams);
   }
+
+  @Override
+  public List<Option> resolveOptions(String requestId, StaticPropertyExtractor parameterExtractor) {
+    String fieldSelector = parameterExtractor.mappingPropertyValue(CONVERT_PROPERTY);
+    try {
+      EventProperty property = parameterExtractor.getEventPropertyBySelector(fieldSelector);
+      if (property instanceof EventPropertyPrimitive) {
+        String runtimeType = ((EventPropertyPrimitive) property).getRuntimeType();
+        if (runtimeType.equals(XSD._string.toString())) {
+          return Options.from(floatValue(), integerValue(), booleanValue());
+        } else if (runtimeType.equals(XSD._integer.toString())) {
+          return Options.from(floatValue(), stringValue(), booleanValue());
+        } else if (runtimeType.equals(XSD._float.toString()) || runtimeType.equals(XSD._double.toString())) {
+          return Options.from(integerValue(), stringValue());
+        } else if (runtimeType.equals(XSD._boolean.toString())) {
+          return Options.from(integerValue(), stringValue());
+        } else {
+          return Options.from(stringValue());
+        }
+      } else {
+        return Options.from(stringValue());
+      }
+
+    } catch (SpRuntimeException e) {
+      e.printStackTrace();
+      return Options.from(stringValue());
+    }
+  }
+
+  @Override
+  public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement, ProcessingElementParameterExtractor parameterExtractor) throws SpRuntimeException {
+    EventSchema eventSchema = processingElement.getInputStreams().get(0).getEventSchema();
+    String fieldSelector = parameterExtractor.mappingPropertyValue(CONVERT_PROPERTY);
+    String convertedType = parameterExtractor.selectedSingleValueInternalName(TARGET_TYPE,
+            String.class);
+    EventProperty property = parameterExtractor.getEventPropertyBySelector(fieldSelector);
+    List<EventProperty> outputProperties = new ArrayList<>();
+    if (property instanceof EventPropertyPrimitive) {
+      ((EventPropertyPrimitive) property).setRuntimeType(convertedType);
+    }
+    eventSchema.getEventProperties().forEach(ep -> {
+      if (ep.getRuntimeName().equals(property.getRuntimeName())) {
+        outputProperties.add(property);
+      } else {
+        outputProperties.add(ep);
+      }
+    });
+
+    return new EventSchema(outputProperties);
+  }
+
+  private Tuple2<String, String> floatValue() {
+    return new Tuple2<>("Float", XSD._float.toString());
+  }
+
+  private Tuple2<String, String> integerValue() {
+    return new Tuple2<>("Integer", XSD._integer.toString());
+  }
+
+  private Tuple2<String, String> booleanValue() {
+    return new Tuple2<>("Boolean", XSD._boolean.toString());
+  }
+
+  private Tuple2<String, String> stringValue() {
+    return new Tuple2<>("String", XSD._string.toString());
+  }
 }