You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/01/24 21:51:39 UTC

[incubator-streampipes] branch STREAMPIPES-272 updated: [STREAMPIPES-287] Support group by clause in Siddhi wrapper

This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch STREAMPIPES-272
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git


The following commit(s) were added to refs/heads/STREAMPIPES-272 by this push:
     new cbe9e3f  [STREAMPIPES-287] Support group by clause in Siddhi wrapper
cbe9e3f is described below

commit cbe9e3fb4f167a5cc45463169dbc16d11571db97
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Jan 24 22:51:25 2021 +0100

    [STREAMPIPES-287] Support group by clause in Siddhi wrapper
---
 .../wrapper/siddhi/engine/SiddhiEventEngine.java   |  8 +++-
 .../siddhi/engine/SiddhiStatementGenerator.java    |  2 +
 .../siddhi/engine/StreamPipesSiddhiProcessor.java  |  8 +++-
 .../engine/generator/SiddhiAppGenerator.java       | 17 +++++--
 .../generator/SiddhiInvocationConfigGenerator.java | 11 ++++-
 .../wrapper/siddhi/query/GroupByClause.java        | 53 ++++++++++++++++++++++
 .../siddhi/query/expression/Expressions.java       | 10 ++++
 .../CountExpression.java}                          | 20 ++++----
 .../expression/window/TimeWindowExpression.java    |  1 +
 9 files changed, 114 insertions(+), 16 deletions(-)

diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index a6ce6e1..46b614b 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
 import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
 import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
   @Override
   public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
     SiddhiInvocationConfigGenerator<B> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
-            this::fromStatement, this::selectStatement);
+            this::fromStatement, this::selectStatement, this::groupByStatement);
     this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
   }
 
@@ -63,4 +64,9 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
     return SiddhiUtils.prepareName(name);
   }
 
+  @Override
+  public String groupByStatement(SiddhiProcessorParams<B> siddhiParams) {
+    return "";
+  }
+
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
index 9abc569..5c14298 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
@@ -26,4 +26,6 @@ public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams>
 
   String selectStatement(SiddhiProcessorParams<B> siddhiParams);
 
+  String groupByStatement(SiddhiProcessorParams<B> siddhiParams);
+
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
index 2aabb4c..24b085a 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
 import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
 import org.apache.streampipes.wrapper.standalone.ProcessorParams;
 import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
 
@@ -41,7 +42,7 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
   @Override
   public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
     SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
-            this::fromStatement, this::selectStatement);
+            this::fromStatement, this::selectStatement, this::groupByStatement);
     this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
   }
 
@@ -54,4 +55,9 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
   public void onDetach() throws SpRuntimeException {
     this.siddhiEngine.shutdownEngine();
   }
+
+  @Override
+  public String groupByStatement(SiddhiProcessorParams<ProcessorParams> siddhiParams) {
+    return "";
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
index 1e32bce..bd074b2 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
@@ -34,15 +34,18 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
   private final SiddhiProcessorParams<B> siddhiParams;
   private final String fromStatement;
   private final String selectStatement;
+  private final String groupByStatement;
 
   private final StringBuilder siddhiAppString;
 
   public SiddhiAppGenerator(SiddhiProcessorParams<B> siddhiParams,
                             String fromStatement,
-                            String selectStatement) {
+                            String selectStatement,
+                            String groupByStatement) {
     this.siddhiParams = siddhiParams;
     this.fromStatement = fromStatement;
     this.selectStatement = selectStatement;
+    this.groupByStatement = groupByStatement;
     this.siddhiAppString = new StringBuilder();
   }
 
@@ -50,7 +53,10 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
     LOG.info("Configuring event types for graph " + this.siddhiParams.getParams().getGraph().getName());
 
     this.siddhiParams.getEventTypeInfo().forEach(this::registerEventType);
-    registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
+    registerStatements(fromStatement,
+            selectStatement,
+            groupByStatement,
+            SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
 
     return this.siddhiAppString.toString();
   }
@@ -70,12 +76,17 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
             .append(");\n");
   }
 
-  private void registerStatements(String fromStatement, String selectStatement, String outputStream) {
+  private void registerStatements(String fromStatement,
+                                  String selectStatement,
+                                  String groupByStatement,
+                                  String outputStream) {
     this.siddhiAppString
             .append(fromStatement)
             .append("\n")
             .append(selectStatement)
             .append("\n")
+            .append(groupByStatement)
+            .append("\n")
             .append("insert into ")
             .append(SiddhiUtils.prepareName(outputStream))
             .append(";");
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
index 1c63ef0..8ec8e48 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
@@ -31,19 +31,22 @@ public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingPara
   private final String siddhiAppString;
   private final String fromStatement;
   private final String selectStatement;
+  private final String groupByStatement;
 
   private final SiddhiProcessorParams<B> siddhiProcessorParams;
 
   public SiddhiInvocationConfigGenerator(B params,
                                          Function<SiddhiProcessorParams<B>, String> fromStatementFunction,
-                                         Function<SiddhiProcessorParams<B>, String> selectStatementFunction) {
+                                         Function<SiddhiProcessorParams<B>, String> selectStatementFunction,
+                                         Function<SiddhiProcessorParams<B>, String> groupByStatementFunction) {
     List<String> inputStreamNames = new InputStreamNameGenerator<>(params).generateInputStreamNames();
     Map<String, List<EventPropertyDef>> eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
     List<String> outputEventKeys = new ArrayList<>(params.getOutEventType().keySet());
     this.siddhiProcessorParams = new SiddhiProcessorParams<>(params, inputStreamNames, eventTypeInfo, outputEventKeys);
     this.fromStatement = fromStatementFunction.apply(this.siddhiProcessorParams);
     this.selectStatement = selectStatementFunction.apply(this.siddhiProcessorParams);
-    this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement)
+    this.groupByStatement = groupByStatementFunction.apply(this.siddhiProcessorParams);
+    this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement, groupByStatement)
             .generateSiddhiApp();
   }
 
@@ -62,4 +65,8 @@ public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingPara
   public String getSelectStatement() {
     return selectStatement;
   }
+
+  public String getGroupByStatement() {
+    return groupByStatement;
+  }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/GroupByClause.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/GroupByClause.java
new file mode 100644
index 0000000..2f4bb23
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/GroupByClause.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.query;
+
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.query.expression.Expression;
+import org.apache.streampipes.wrapper.siddhi.query.expression.PropertyExpression;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class GroupByClause extends Expression {
+
+  private List<PropertyExpression> propertyExpressions;
+
+  public static GroupByClause create(List<PropertyExpression> groupByProperties) {
+    return new GroupByClause(groupByProperties);
+  }
+
+  public static GroupByClause create(PropertyExpression... outputProperties) {
+    return new GroupByClause(Arrays.asList(outputProperties));
+  }
+
+  private GroupByClause(List<PropertyExpression> groupByProperties) {
+    this.propertyExpressions = groupByProperties;
+  }
+
+  @Override
+  public String toSiddhiEpl() {
+    return join(SiddhiConstants.WHITESPACE, "group by", join(SiddhiConstants.COMMA,
+            propertyExpressions
+                    .stream()
+                    .map(PropertyExpression::toSiddhiEpl)
+                    .collect(Collectors.toList())));
+  }
+
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index c2f63c7..afb7cd6 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.wrapper.siddhi.query.expression;
 
 import org.apache.streampipes.wrapper.siddhi.constants.SiddhiStreamSelector;
 import org.apache.streampipes.wrapper.siddhi.model.EventPropertyDef;
+import org.apache.streampipes.wrapper.siddhi.query.expression.aggregation.CountExpression;
 import org.apache.streampipes.wrapper.siddhi.query.expression.list.CollectListExpression;
 import org.apache.streampipes.wrapper.siddhi.query.expression.list.ContainsListExpression;
 import org.apache.streampipes.wrapper.siddhi.query.expression.math.MathDivideExpression;
@@ -27,6 +28,7 @@ import org.apache.streampipes.wrapper.siddhi.query.expression.pattern.EveryExpre
 import org.apache.streampipes.wrapper.siddhi.query.expression.pattern.PatternCountExpression;
 import org.apache.streampipes.wrapper.siddhi.query.expression.pattern.PatternCountOperator;
 import org.apache.streampipes.wrapper.siddhi.query.expression.window.BatchWindowExpression;
+import org.apache.streampipes.wrapper.siddhi.query.expression.window.TimeWindowExpression;
 import org.apache.streampipes.wrapper.siddhi.query.expression.window.WindowExpression;
 
 import java.util.Arrays;
@@ -49,6 +51,10 @@ public class Expressions {
     return new ContainsListExpression(listProperty, value);
   }
 
+  public static PropertyExpressionBase count(PropertyExpression property) {
+    return new CountExpression(property);
+  }
+
   public static PropertyExpressionBase divide(PropertyExpressionBase op1, PropertyExpressionBase op2) {
     return new MathDivideExpression(op1, op2);
   }
@@ -147,6 +153,10 @@ public class Expressions {
     return new StreamExpression(streamName, streamAlias);
   }
 
+  public static WindowExpression timeWindow(Integer windowSize, SiddhiTimeUnit timeUnit) {
+    return new TimeWindowExpression(windowSize, timeUnit);
+  }
+
   private static String makeStaticString(String staticValue) {
     return "'" + staticValue + "'";
   }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/aggregation/CountExpression.java
similarity index 61%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/aggregation/CountExpression.java
index 198b366..82fb154 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/aggregation/CountExpression.java
@@ -15,24 +15,26 @@
  * limitations under the License.
  *
  */
-package org.apache.streampipes.wrapper.siddhi.query.expression.window;
+package org.apache.streampipes.wrapper.siddhi.query.expression.aggregation;
 
 import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import org.apache.streampipes.wrapper.siddhi.query.expression.SiddhiTimeUnit;
+import org.apache.streampipes.wrapper.siddhi.query.expression.PropertyExpression;
+import org.apache.streampipes.wrapper.siddhi.query.expression.PropertyExpressionBase;
 
-public class TimeWindowExpression extends WindowExpression {
+public class CountExpression extends PropertyExpressionBase {
 
-  private SiddhiTimeUnit timeUnit;
+  private PropertyExpression propertyExpression;
 
-  public TimeWindowExpression(Integer timeWindowSize, SiddhiTimeUnit timeUnit) {
-    super(timeWindowSize);
-    this.timeUnit = timeUnit;
+  public CountExpression(PropertyExpression property) {
+    this.propertyExpression = property;
   }
 
   @Override
   public String toSiddhiEpl() {
     return join(SiddhiConstants.EMPTY,
-            windowExpression(),
-            windowValue(windowValue + SiddhiConstants.WHITESPACE + timeUnit.toTimeUnitString()));
+            "count",
+            SiddhiConstants.PARENTHESIS_OPEN,
+            propertyExpression.toSiddhiEpl(),
+            SiddhiConstants.PARENTHESIS_CLOSE);
   }
 }
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
index 198b366..c4334bc 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
@@ -33,6 +33,7 @@ public class TimeWindowExpression extends WindowExpression {
   public String toSiddhiEpl() {
     return join(SiddhiConstants.EMPTY,
             windowExpression(),
+            "time",
             windowValue(windowValue + SiddhiConstants.WHITESPACE + timeUnit.toTimeUnitString()));
   }
 }