You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2022/01/18 22:24:54 UTC

[incubator-streampipes] 04/04: [hotfix] Add event index support to Siddhi query builder

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git

commit ff57daac3ce264acf371ad98b7b962669cd39840
Author: Dominik Riemer <do...@gmail.com>
AuthorDate: Tue Jan 18 23:23:15 2022 +0100

    [hotfix] Add event index support to Siddhi query builder
---
 .../apache/streampipes/processors/siddhi/trend/Trend.java   |  2 +-
 .../processors/siddhi/trend/TrendController.java            |  2 +-
 .../streampipes/wrapper/siddhi/engine/SiddhiEngine.java     | 13 +------------
 .../wrapper/siddhi/query/expression/Expressions.java        |  4 ++++
 .../wrapper/siddhi/query/expression/PropertyExpression.java |  6 +++++-
 .../streampipes/wrapper/siddhi/utils/SiddhiUtils.java       |  3 ++-
 6 files changed, 14 insertions(+), 16 deletions(-)

diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java
index b72f6da..5404e0d 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/Trend.java
@@ -79,7 +79,7 @@ public class Trend extends SiddhiEventEngine<TrendParameters> {
     List<String> outputFieldSelectors = siddhiParams.getParams().getOutputFieldSelectors();
     outputFieldSelectors
             .forEach(outputFieldSelector -> selectClause
-                    .addProperty(Expressions.property("e2", outputFieldSelector)));
+                    .addProperty(Expressions.property("e2", outputFieldSelector, "last")));
 
     return selectClause;
   }
diff --git a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java
index 510410c..dfda35e 100644
--- a/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java
+++ b/streampipes-extensions/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/trend/TrendController.java
@@ -59,7 +59,7 @@ public class TrendController extends StandaloneEventProcessingDeclarer<TrendPara
     @Override
     public ConfiguredEventProcessor<TrendParameters> onInvocation(DataProcessorInvocation
                                                                              invocationGraph, ProcessingElementParameterExtractor extractor) {
-        String operation = extractor.selectedSingleValue( Operation, String.class);
+        String operation = extractor.selectedSingleValue(Operation, String.class);
         int increase = extractor.singleValueParameter(Increase, Integer.class);
         int duration = extractor.singleValueParameter(Duration, Integer.class);
         String mapping = extractor.mappingPropertyValue(Mapping);
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
index e63a19b..4c18d4c 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEngine.java
@@ -71,8 +71,6 @@ public class SiddhiEngine {
     this.typeInfo = settings.getSiddhiProcessorParams().getEventTypeInfo();
     SiddhiManager siddhiManager = SpSiddhiManager.INSTANCE.getSiddhiManager();
 
-    //this.timestampField = removeStreamIdFromTimestamp(setTimestamp(parameters));
-
     siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(settings.getSiddhiAppString());
     settings.getSiddhiProcessorParams().getParams()
             .getInEventTypes()
@@ -93,7 +91,7 @@ public class SiddhiEngine {
     } else {
       callback = new SiddhiOutputStreamDebugCallback(debugCallback, settings.getSiddhiAppConfig().getOutputConfig());
     }
-    System.out.println(SiddhiUtils.getPreparedOutputTopicName(params));
+    LOG.info(SiddhiUtils.getPreparedOutputTopicName(params));
     siddhiAppRuntime.addCallback(SiddhiUtils.getPreparedOutputTopicName(params), callback);
     siddhiAppRuntime.start();
   }
@@ -118,13 +116,4 @@ public class SiddhiEngine {
     this.siddhiAppRuntime.shutdown();
   }
 
-//  public void setSortedEventKeys(List<String> sortedEventKeys) {
-//    String streamId = (String) this.listOfEventKeys.keySet().toArray()[0];    // only reliable if there is only one stream, else use changeEventKeys() to respective streamId
-//    changeEventKeys(streamId, sortedEventKeys);
-//  }
-//
-//  public void changeEventKeys(String streamId, List<String> newEventKeys) {
-//    this.listOfEventKeys.put(streamId, newEventKeys);
-//  }
-
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index ae2f976..40dfbda 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -169,6 +169,10 @@ public class Expressions {
     return new PropertyExpression(streamName, propertyName);
   }
 
+  public static PropertyExpression property(String streamName, String propertyName, String eventIndex) {
+    return new PropertyExpression(streamName, propertyName, eventIndex);
+  }
+
   public static PropertyExpression property(String propertyName) {
     return new PropertyExpression(propertyName);
   }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
index 81f0ee9..9619865 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/PropertyExpression.java
@@ -23,12 +23,16 @@ import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 
 public class PropertyExpression extends PropertyExpressionBase {
 
-  private String siddhiPropertyName;
+  private final String siddhiPropertyName;
 
   public PropertyExpression(String streamName, String property) {
     this.siddhiPropertyName = join(".", streamName, property);
   }
 
+  public PropertyExpression(String streamName, String property, String eventIndex) {
+    this.siddhiPropertyName = join(".", streamName + "[" + eventIndex + "]", property);
+  }
+
   public PropertyExpression(String property) {
     this.siddhiPropertyName = property;
   }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
index 5174bd1..0a72e15 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/utils/SiddhiUtils.java
@@ -65,7 +65,8 @@ public class SiddhiUtils {
               outputKey.startsWith(SiddhiConstants.SECOND_STREAM_PREFIX)) {
         outputKey = outputKey.substring(2);
       }
-      outMap.put(outputKey, event.getData(i));
+      Object data = event.getData(i);
+      outMap.put(outputKey, data);
     }
 
     return outMap;