You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2020/09/18 09:20:37 UTC
[incubator-streampipes-extensions] branch dev updated: [hotfix] Add
property scope to aggregation output
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new c2359a1 [hotfix] Add property scope to aggregation output
c2359a1 is described below
commit c2359a139eb380e3fce9a7e4b3e8c724a3aea72a
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Fri Sep 18 11:15:04 2020 +0200
[hotfix] Add property scope to aggregation output
---
.../flink/processor/aggregation/AggregationController.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java b/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
index 1d0501d..9d18106 100644
--- a/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
+++ b/streampipes-processors-aggregation-flink/src/main/java/org/apache/streampipes/processors/aggregation/flink/processor/aggregation/AggregationController.java
@@ -25,15 +25,19 @@ import org.apache.streampipes.model.DataProcessorType;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.schema.EventProperty;
+import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.aggregation.flink.config.AggregationFlinkConfig;
import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.sdk.helpers.*;
import org.apache.streampipes.sdk.utils.Assets;
+import org.apache.streampipes.sdk.utils.Datatypes;
+import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorDeclarer;
import org.apache.streampipes.wrapper.flink.FlinkDataProcessorRuntime;
@@ -138,7 +142,11 @@ public class AggregationController extends FlinkDataProcessorDeclarer<Aggregatio
for (String aggregate: aggregateKeyList) {
String propertyPrefix = StringUtils.substringAfterLast(aggregate, ":");
String runtimeName = propertyPrefix + "_" + operationKey.toLowerCase();
- eventSchema.addEventProperty(EpProperties.doubleEp(Labels.withId(runtimeName), runtimeName, "http://schema.org/Number"));
+ EventPropertyPrimitive primitive = PrimitivePropertyBuilder.create(Datatypes.Double, runtimeName)
+ .domainProperty(SO.Number)
+ .scope(PropertyScope.MEASUREMENT_PROPERTY)
+ .build();
+ eventSchema.addEventProperty(primitive);
}
return eventSchema;
}