You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/18 22:24:54 UTC
[incubator-streampipes] 04/04: [hotfix] Add event index support to Siddhi query builder
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
commit ff57daac3ce264acf371ad98b7b962669cd39840
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Jan 18 23:23:15 2022 +0100
[hotfix] Add event index support to Siddhi query builder
---
.../apache/streampipes/processors/siddhi/trend/Trend.java | 2 +-
.../processors/siddhi/trend/TrendController.java | 2 +-
.../streampipes/wrapper/siddhi/engine/SiddhiEngine.java | 13 +------------
.../wrapper/siddhi/query/expression/Expressions.java | 4 ++++
.../wrapper/siddhi/query/expression/PropertyExpression.java | 6 +++++-
.../streampipes/wrapper/siddhi/utils/SiddhiUtils.java | 3 ++-
6 files changed, 14 insertions(+), 16 deletions(-)
diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java
index b72f6da..5404e0d 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java
@@ -79,7 +79,7 @@ public class Trend extends SiddhiEventEngine<TrendParameters> {
List<String> outputFieldSelectors = siddhiParams.getParams().getOutputFieldSelectors();
outputFieldSelectors
.forEach(outputFieldSelector -> selectClause
- .addProperty(Expressions.property("e2", outputFieldSelector)));
+ .addProperty(Expressions.property("e2", outputFieldSelector, "last")));
return selectClause;
}
diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java
index 510410c..dfda35e 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java
@@ -59,7 +59,7 @@ public class TrendController extends StandaloneEventProcessingDeclarer<TrendPara
@Override
public ConfiguredEventProcessor<TrendParameters> onInvocation(DataProcessorInvocation
invocationGraph, ProcessingElementParameterExtractor extractor) {
- String operation = extractor.selectedSingleValue( Operation, String.class);
+ String operation = extractor.selectedSingleValue(Operation, String.class);
int increase = extractor.singleValueParameter(Increase, Integer.class);
int duration = extractor.singleValueParameter(Duration, Integer.class);
String mapping = extractor.mappingPropertyValue(Mapping);
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
index e63a19b..4c18d4c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
@@ -71,8 +71,6 @@ public class SiddhiEngine {
this.typeInfo = settings.getSiddhiProcessorParams().getEventTypeInfo();
SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
- //this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
-
siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
settings.getSiddhiProcessorParams().getParams()
.getInEventTypes()
@@ -93,7 +91,7 @@ public class SiddhiEngine {
} else {
callback = new SiddhiOutputStreamDebugCallback(debugCallback, settings.getSiddhiAppConfig().getOutputConfig());
}
- System.out.println(SiddhiUtils.getPreparedOutputTopicName(params));
+ LOG.info(SiddhiUtils.getPreparedOutputTopicName(params));
siddhiAppRuntime.addCallback(SiddhiUtils.getPreparedOutputTopicName(params), callback);
siddhiAppRuntime.start();
}
@@ -118,13 +116,4 @@ public class SiddhiEngine {
this.siddhiAppRuntime.shutdown();
}
-// public void setSortedEventKeys(List<String> sortedEventKeys) {
-// String streamId = (String) this.listOfEventKeys.keySet().toArray()[0]; // only reliable if there is only one stream, else use changeEventKeys() to respective streamId
-// changeEventKeys(streamId, sortedEventKeys);
-// }
-//
-// public void changeEventKeys(String streamId, List<String> newEventKeys) {
-// this.listOfEventKeys.put(streamId, newEventKeys);
-// }
-
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index ae2f976..40dfbda 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -169,6 +169,10 @@ public class Expressions {
return new PropertyExpression(streamName, propertyName);
}
+ public static PropertyExpression property(String streamName, String propertyName, String eventIndex) {
+ return new PropertyExpression(streamName, propertyName, eventIndex);
+ }
+
public static PropertyExpression property(String propertyName) {
return new PropertyExpression(propertyName);
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
index 81f0ee9..9619865 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
@@ -23,12 +23,16 @@ import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
public class PropertyExpression extends PropertyExpressionBase {
- private String siddhiPropertyName;
+ private final String siddhiPropertyName;
public PropertyExpression(String streamName, String property) {
this.siddhiPropertyName = join(".", streamName, property);
}
+ public PropertyExpression(String streamName, String property, String eventIndex) {
+ this.siddhiPropertyName = join(".", streamName + "[" + eventIndex + "]", property);
+ }
+
public PropertyExpression(String property) {
this.siddhiPropertyName = property;
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
index 5174bd1..0a72e15 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
@@ -65,7 +65,8 @@ public class SiddhiUtils {
outputKey.startsWith(SiddhiConstants.SECOND_STREAM_PREFIX)) {
outputKey = outputKey.substring(2);
}
- outMap.put(outputKey, event.getData(i));
+ Object data = event.getData(i);
+ outMap.put(outputKey, data);
}
return outMap;