You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/18 09:20:32 UTC

[incubator-streampipes-extensions] branch rel/0.67.0 updated: [hotfix] Add property scope to aggregation output

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch rel/0.67.0
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/rel/0.67.0 by this push:
     new bb82ad4  [hotfix] Add property scope to aggregation output
bb82ad4 is described below

commit bb82ad4371c0e2f5dedf3677a026a1897e43a588
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Fri Sep 18 11:15:04 2020 +0200

    [hotfix] Add property scope to aggregation output
---
 .../flink/processor/aggregation/AggregationController.java     | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java b/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
index 1d0501d..9d18106 100644
--- a/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
+++ b/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
@@ -25,15 +25,19 @@ import org.apache.streampipes.model.DataProcessorType;
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
 import org.apache.streampipes.model.schema.EventSchema;
 import org.apache.streampipes.model.schema.PropertyScope;
 import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
 import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
 import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
 import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
 import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
 import org.apache.streampipes.sdk.helpers.*;
 import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
 import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
 
@@ -138,7 +142,11 @@ public class AggregationController extends FlinkDataProcessorDeclarer<Aggregatio
     for (String aggregate: aggregateKeyList) {
       String propertyPrefix = StringUtils.substringAfterLast(aggregate, ":");
       String runtimeName = propertyPrefix + "_" + operationKey.toLowerCase();
-      eventSchema.addEventProperty(EpProperties.doubleEp(Labels.withId(runtimeName), runtimeName, "http://schema.org/Number"));
+      EventPropertyPrimitive primitive = PrimitivePropertyBuilder.create(Datatypes.Double, runtimeName)
+              .domainProperty(SO.Number)
+              .scope(PropertyScope.MEASUREMENT_PROPERTY)
+              .build();
+      eventSchema.addEventProperty(primitive);
     }
     return eventSchema;
   }