You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/05/27 15:19:38 UTC
[incubator-streampipes] branch dev updated: [STREAMPIPES-146]
Support CustomOutputStrategy in SiddhiEventEngine
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/dev by this push:
new 30238b7 [STREAMPIPES-146] Support CustomOutputStrategy in SiddhiEventEngine
new 574b0a5 Merge branch 'dev' of https://github.com/apache/incubator-streampipes into dev
30238b7 is described below
commit 30238b7ee8663a1fd70a39efd67508526257b21e
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed May 27 17:19:12 2020 +0200
[STREAMPIPES-146] Support CustomOutputStrategy in SiddhiEventEngine
---
.../wrapper/siddhi/engine/SiddhiEventEngine.java | 35 +++++++++++++++++-----
1 file changed, 27 insertions(+), 8 deletions(-)
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index 08671a5..15bd0db 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -46,6 +46,7 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
private List<String> inputStreamNames;
private List<String> sortedEventKeys;
+ private List<String> outputEventKeys;
private Boolean debugMode;
private SiddhiDebugCallback debugCallback;
@@ -57,6 +58,7 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
this.siddhiInputHandlers = new HashMap<>();
this.inputStreamNames = new ArrayList<>();
sortedEventKeys = new ArrayList<>();
+ outputEventKeys = new ArrayList<>();
this.debugMode = false;
}
@@ -82,6 +84,10 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
this.inputStreamNames.add(prepareName(key));
});
+ LOG.info("Configuring output event keys for graph " + parameters.getGraph().getName());
+ //System.out.println("output key: " + key);
+ outputEventKeys.addAll(parameters.getOutEventType().keySet());
+
String fromStatement = fromStatement(inputStreamNames, parameters);
String selectStatement = selectStatement(parameters);
registerStatements(fromStatement, selectStatement, getOutputTopicName(parameters));
@@ -132,14 +138,14 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
private org.apache.streampipes.model.runtime.Event toSpEvent(Event event, B parameters, SchemaInfo
schemaInfo, SourceInfo sourceInfo) {
Map<String, Object> outMap = new HashMap<>();
- for (int i = 0; i < sortedEventKeys.size(); i++) {
+ for (int i = 0; i < outputEventKeys.size(); i++) {
if (event.getData(i) instanceof LinkedList) {
List<Object> tmp = (List<Object>) event.getData(i);
- outMap.put(sortedEventKeys.get(i), tmp.get(0));
+ outMap.put(outputEventKeys.get(i), tmp.get(0));
}
else {
- outMap.put(sortedEventKeys.get(i), event.getData(i));
+ outMap.put(outputEventKeys.get(i), event.getData(i));
}
}
@@ -235,19 +241,32 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
StringBuilder selectString = new StringBuilder();
selectString.append("select ");
- if (sortedEventKeys.size() > 0) {
- for (int i = 0; i < sortedEventKeys.size() - 1; i++) {
- selectString.append(eventName + ".s0" + sortedEventKeys.get(i) + ",");
+ if (outputEventKeys.size() > 0) {
+ for (int i = 0; i < outputEventKeys.size() - 1; i++) {
+ selectString.append(eventName + ".s0" + outputEventKeys.get(i) + ",");
}
- selectString.append(eventName + ".s0" + sortedEventKeys.get(sortedEventKeys.size() - 1));
+ selectString.append(eventName + ".s0" + outputEventKeys.get(outputEventKeys.size() - 1));
}
return selectString.toString();
}
+// protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
+// return getCustomOutputSelectStatement(invocation, "e1");
+// }
+
protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
- return getCustomOutputSelectStatement(invocation, "e1");
+ StringBuilder selectString = new StringBuilder();
+ selectString.append("select ");
+
+ if (outputEventKeys.size() > 0) {
+ for (int i=0; i<outputEventKeys.size() - 1; i++) {
+ selectString.append("s0" + outputEventKeys.get(i) + ",");
+ }
+ selectString.append("s0" + outputEventKeys.get(outputEventKeys.size() - 1));
+ }
+ return selectString.toString();
}
public void setSortedEventKeys(List<String> sortedEventKeys) {