You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/05/27 15:19:38 UTC

[incubator-streampipes] branch dev updated: [STREAMPIPES-146] Support CustomOutputStrategy in SiddhiEventEngine

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/dev by this push:
     new 30238b7  [STREAMPIPES-146] Support CustomOutputStrategy in SiddhiEventEngine
     new 574b0a5  Merge branch 'dev' of https://github.com/apache/incubator-streampipes into dev
30238b7 is described below

commit 30238b7ee8663a1fd70a39efd67508526257b21e
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Wed May 27 17:19:12 2020 +0200

    [STREAMPIPES-146] Support CustomOutputStrategy in SiddhiEventEngine
---
 .../wrapper/siddhi/engine/SiddhiEventEngine.java   | 35 +++++++++++++++++-----
 1 file changed, 27 insertions(+), 8 deletions(-)

diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index 08671a5..15bd0db 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -46,6 +46,7 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
   private List<String> inputStreamNames;
 
   private List<String> sortedEventKeys;
+  private List<String> outputEventKeys;
 
   private Boolean debugMode;
   private SiddhiDebugCallback debugCallback;
@@ -57,6 +58,7 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
     this.siddhiInputHandlers = new HashMap<>();
     this.inputStreamNames = new ArrayList<>();
     sortedEventKeys = new ArrayList<>();
+    outputEventKeys = new ArrayList<>();
     this.debugMode = false;
   }
 
@@ -82,6 +84,10 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
       this.inputStreamNames.add(prepareName(key));
     });
 
+    LOG.info("Configuring output event keys for graph " + parameters.getGraph().getName());
+    //System.out.println("output key: " + key);
+    outputEventKeys.addAll(parameters.getOutEventType().keySet());
+
     String fromStatement = fromStatement(inputStreamNames, parameters);
     String selectStatement = selectStatement(parameters);
     registerStatements(fromStatement, selectStatement, getOutputTopicName(parameters));
@@ -132,14 +138,14 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
   private org.apache.streampipes.model.runtime.Event toSpEvent(Event event, B parameters, SchemaInfo
           schemaInfo, SourceInfo sourceInfo) {
     Map<String, Object> outMap = new HashMap<>();
-    for (int i = 0; i < sortedEventKeys.size(); i++) {
+    for (int i = 0; i < outputEventKeys.size(); i++) {
 
       if (event.getData(i) instanceof LinkedList) {
         List<Object> tmp = (List<Object>) event.getData(i);
-        outMap.put(sortedEventKeys.get(i), tmp.get(0));
+        outMap.put(outputEventKeys.get(i), tmp.get(0));
       }
       else {
-        outMap.put(sortedEventKeys.get(i), event.getData(i));
+        outMap.put(outputEventKeys.get(i), event.getData(i));
       }
 
     }
@@ -235,19 +241,32 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
     StringBuilder selectString = new StringBuilder();
     selectString.append("select ");
 
-    if (sortedEventKeys.size() > 0) {
-      for (int i = 0; i < sortedEventKeys.size() - 1; i++) {
-        selectString.append(eventName + ".s0" + sortedEventKeys.get(i) + ",");
+    if (outputEventKeys.size() > 0) {
+      for (int i = 0; i < outputEventKeys.size() - 1; i++) {
+        selectString.append(eventName + ".s0" + outputEventKeys.get(i) + ",");
       }
-      selectString.append(eventName + ".s0" + sortedEventKeys.get(sortedEventKeys.size() - 1));
+      selectString.append(eventName + ".s0" + outputEventKeys.get(outputEventKeys.size() - 1));
 
     }
 
     return selectString.toString();
   }
 
+//  protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
+//    return getCustomOutputSelectStatement(invocation, "e1");
+//  }
+
   protected String getCustomOutputSelectStatement(DataProcessorInvocation invocation) {
-    return getCustomOutputSelectStatement(invocation, "e1");
+    StringBuilder selectString = new StringBuilder();
+    selectString.append("select ");
+
+    if (outputEventKeys.size() > 0) {
+      for (int i=0; i<outputEventKeys.size() - 1; i++) {
+        selectString.append("s0" + outputEventKeys.get(i) + ",");
+      }
+      selectString.append("s0" + outputEventKeys.get(outputEventKeys.size() - 1));
+    }
+    return selectString.toString();
   }
 
   public void setSortedEventKeys(List<String> sortedEventKeys) {