You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ri...@apache.org on 2021/01/24 21:51:39 UTC
[incubator-streampipes] branch STREAMPIPES-272 updated:
[STREAMPIPES-287] Support group by clause in Siddhi wrapper
This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch STREAMPIPES-272
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes.git
The following commit(s) were added to refs/heads/STREAMPIPES-272 by this push:
new cbe9e3f [STREAMPIPES-287] Support group by clause in Siddhi wrapper
cbe9e3f is described below
commit cbe9e3fb4f167a5cc45463169dbc16d11571db97
Author: Dominik Riemer <ri...@fzi.de>
AuthorDate: Sun Jan 24 22:51:25 2021 +0100
[STREAMPIPES-287] Support group by clause in Siddhi wrapper
---
.../wrapper/siddhi/engine/SiddhiEventEngine.java | 8 +++-
.../siddhi/engine/SiddhiStatementGenerator.java | 2 +
.../siddhi/engine/StreamPipesSiddhiProcessor.java | 8 +++-
.../engine/generator/SiddhiAppGenerator.java | 17 +++++--
.../generator/SiddhiInvocationConfigGenerator.java | 11 ++++-
.../wrapper/siddhi/query/GroupByClause.java | 53 ++++++++++++++++++++++
.../siddhi/query/expression/Expressions.java | 10 ++++
.../CountExpression.java} | 20 ++++----
.../expression/window/TimeWindowExpression.java | 1 +
9 files changed, 114 insertions(+), 16 deletions(-)
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
index a6ce6e1..46b614b 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiEventEngine.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
import org.apache.streampipes.wrapper.siddhi.utils.SiddhiUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +46,7 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
@Override
public void onInvocation(B parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) {
SiddhiInvocationConfigGenerator<B> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
- this::fromStatement, this::selectStatement);
+ this::fromStatement, this::selectStatement, this::groupByStatement);
this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
}
@@ -63,4 +64,9 @@ public abstract class SiddhiEventEngine<B extends EventProcessorBindingParams> i
return SiddhiUtils.prepareName(name);
}
+ @Override
+ public String groupByStatement(SiddhiProcessorParams<B> siddhiParams) {
+ return "";
+ }
+
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
index 9abc569..5c14298 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/SiddhiStatementGenerator.java
@@ -26,4 +26,6 @@ public interface SiddhiStatementGenerator<B extends EventProcessorBindingParams>
String selectStatement(SiddhiProcessorParams<B> siddhiParams);
+ String groupByStatement(SiddhiProcessorParams<B> siddhiParams);
+
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
index 2aabb4c..24b085a 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/StreamPipesSiddhiProcessor.java
@@ -23,6 +23,7 @@ import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.siddhi.engine.callback.SiddhiDebugCallback;
import org.apache.streampipes.wrapper.siddhi.engine.generator.SiddhiInvocationConfigGenerator;
+import org.apache.streampipes.wrapper.siddhi.model.SiddhiProcessorParams;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
@@ -41,7 +42,7 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
@Override
public void onInvocation(ProcessorParams parameters, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
SiddhiInvocationConfigGenerator<ProcessorParams> siddhiConfigGenerator = new SiddhiInvocationConfigGenerator<>(parameters,
- this::fromStatement, this::selectStatement);
+ this::fromStatement, this::selectStatement, this::groupByStatement);
this.siddhiEngine.initializeEngine(siddhiConfigGenerator, spOutputCollector, runtimeContext);
}
@@ -54,4 +55,9 @@ public abstract class StreamPipesSiddhiProcessor extends StreamPipesDataProcesso
public void onDetach() throws SpRuntimeException {
this.siddhiEngine.shutdownEngine();
}
+
+ @Override
+ public String groupByStatement(SiddhiProcessorParams<ProcessorParams> siddhiParams) {
+ return "";
+ }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
index 1e32bce..bd074b2 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiAppGenerator.java
@@ -34,15 +34,18 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
private final SiddhiProcessorParams<B> siddhiParams;
private final String fromStatement;
private final String selectStatement;
+ private final String groupByStatement;
private final StringBuilder siddhiAppString;
public SiddhiAppGenerator(SiddhiProcessorParams<B> siddhiParams,
String fromStatement,
- String selectStatement) {
+ String selectStatement,
+ String groupByStatement) {
this.siddhiParams = siddhiParams;
this.fromStatement = fromStatement;
this.selectStatement = selectStatement;
+ this.groupByStatement = groupByStatement;
this.siddhiAppString = new StringBuilder();
}
@@ -50,7 +53,10 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
LOG.info("Configuring event types for graph " + this.siddhiParams.getParams().getGraph().getName());
this.siddhiParams.getEventTypeInfo().forEach(this::registerEventType);
- registerStatements(fromStatement, selectStatement, SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
+ registerStatements(fromStatement,
+ selectStatement,
+ groupByStatement,
+ SiddhiUtils.getOutputTopicName(this.siddhiParams.getParams()));
return this.siddhiAppString.toString();
}
@@ -70,12 +76,17 @@ public class SiddhiAppGenerator<B extends EventProcessorBindingParams> {
.append(");\n");
}
- private void registerStatements(String fromStatement, String selectStatement, String outputStream) {
+ private void registerStatements(String fromStatement,
+ String selectStatement,
+ String groupByStatement,
+ String outputStream) {
this.siddhiAppString
.append(fromStatement)
.append("\n")
.append(selectStatement)
.append("\n")
+ .append(groupByStatement)
+ .append("\n")
.append("insert into ")
.append(SiddhiUtils.prepareName(outputStream))
.append(";");
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
index 1c63ef0..8ec8e48 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/engine/generator/SiddhiInvocationConfigGenerator.java
@@ -31,19 +31,22 @@ public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingPara
private final String siddhiAppString;
private final String fromStatement;
private final String selectStatement;
+ private final String groupByStatement;
private final SiddhiProcessorParams<B> siddhiProcessorParams;
public SiddhiInvocationConfigGenerator(B params,
Function<SiddhiProcessorParams<B>, String> fromStatementFunction,
- Function<SiddhiProcessorParams<B>, String> selectStatementFunction) {
+ Function<SiddhiProcessorParams<B>, String> selectStatementFunction,
+ Function<SiddhiProcessorParams<B>, String> groupByStatementFunction) {
List<String> inputStreamNames = new InputStreamNameGenerator<>(params).generateInputStreamNames();
Map<String, List<EventPropertyDef>> eventTypeInfo = new EventTypeGenerator<>(params).generateEventTypes();
List<String> outputEventKeys = new ArrayList<>(params.getOutEventType().keySet());
this.siddhiProcessorParams = new SiddhiProcessorParams<>(params, inputStreamNames, eventTypeInfo, outputEventKeys);
this.fromStatement = fromStatementFunction.apply(this.siddhiProcessorParams);
this.selectStatement = selectStatementFunction.apply(this.siddhiProcessorParams);
- this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement)
+ this.groupByStatement = groupByStatementFunction.apply(this.siddhiProcessorParams);
+ this.siddhiAppString = new SiddhiAppGenerator<>(siddhiProcessorParams, fromStatement, selectStatement, groupByStatement)
.generateSiddhiApp();
}
@@ -62,4 +65,8 @@ public class SiddhiInvocationConfigGenerator<B extends EventProcessorBindingPara
public String getSelectStatement() {
return selectStatement;
}
+
+ public String getGroupByStatement() {
+ return groupByStatement;
+ }
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/GroupByClause.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/GroupByClause.java
new file mode 100644
index 0000000..2f4bb23
--- /dev/null
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/GroupByClause.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.streampipes.wrapper.siddhi.query;
+
+import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
+import org.apache.streampipes.wrapper.siddhi.query.expression.Expression;
+import org.apache.streampipes.wrapper.siddhi.query.expression.PropertyExpression;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class GroupByClause extends Expression {
+
+ private List<PropertyExpression> propertyExpressions;
+
+ public static GroupByClause create(List<PropertyExpression> groupByProperties) {
+ return new GroupByClause(groupByProperties);
+ }
+
+ public static GroupByClause create(PropertyExpression... outputProperties) {
+ return new GroupByClause(Arrays.asList(outputProperties));
+ }
+
+ private GroupByClause(List<PropertyExpression> groupByProperties) {
+ this.propertyExpressions = groupByProperties;
+ }
+
+ @Override
+ public String toSiddhiEpl() {
+ return join(SiddhiConstants.WHITESPACE, "group by", join(SiddhiConstants.COMMA,
+ propertyExpressions
+ .stream()
+ .map(PropertyExpression::toSiddhiEpl)
+ .collect(Collectors.toList())));
+ }
+
+}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
index c2f63c7..afb7cd6 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/Expressions.java
@@ -19,6 +19,7 @@ package org.apache.streampipes.wrapper.siddhi.query.expression;
import org.apache.streampipes.wrapper.siddhi.constants.SiddhiStreamSelector;
import org.apache.streampipes.wrapper.siddhi.model.EventPropertyDef;
+import org.apache.streampipes.wrapper.siddhi.query.expression.aggregation.CountExpression;
import org.apache.streampipes.wrapper.siddhi.query.expression.list.CollectListExpression;
import org.apache.streampipes.wrapper.siddhi.query.expression.list.ContainsListExpression;
import org.apache.streampipes.wrapper.siddhi.query.expression.math.MathDivideExpression;
@@ -27,6 +28,7 @@ import org.apache.streampipes.wrapper.siddhi.query.expression.pattern.EveryExpre
import org.apache.streampipes.wrapper.siddhi.query.expression.pattern.PatternCountExpression;
import org.apache.streampipes.wrapper.siddhi.query.expression.pattern.PatternCountOperator;
import org.apache.streampipes.wrapper.siddhi.query.expression.window.BatchWindowExpression;
+import org.apache.streampipes.wrapper.siddhi.query.expression.window.TimeWindowExpression;
import org.apache.streampipes.wrapper.siddhi.query.expression.window.WindowExpression;
import java.util.Arrays;
@@ -49,6 +51,10 @@ public class Expressions {
return new ContainsListExpression(listProperty, value);
}
+ public static PropertyExpressionBase count(PropertyExpression property) {
+ return new CountExpression(property);
+ }
+
public static PropertyExpressionBase divide(PropertyExpressionBase op1, PropertyExpressionBase op2) {
return new MathDivideExpression(op1, op2);
}
@@ -147,6 +153,10 @@ public class Expressions {
return new StreamExpression(streamName, streamAlias);
}
+ public static WindowExpression timeWindow(Integer windowSize, SiddhiTimeUnit timeUnit) {
+ return new TimeWindowExpression(windowSize, timeUnit);
+ }
+
private static String makeStaticString(String staticValue) {
return "'" + staticValue + "'";
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/aggregation/CountExpression.java
similarity index 61%
copy from streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
copy to streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/aggregation/CountExpression.java
index 198b366..82fb154 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/aggregation/CountExpression.java
@@ -15,24 +15,26 @@
* limitations under the License.
*
*/
-package org.apache.streampipes.wrapper.siddhi.query.expression.window;
+package org.apache.streampipes.wrapper.siddhi.query.expression.aggregation;
import org.apache.streampipes.wrapper.siddhi.constants.SiddhiConstants;
-import org.apache.streampipes.wrapper.siddhi.query.expression.SiddhiTimeUnit;
+import org.apache.streampipes.wrapper.siddhi.query.expression.PropertyExpression;
+import org.apache.streampipes.wrapper.siddhi.query.expression.PropertyExpressionBase;
-public class TimeWindowExpression extends WindowExpression {
+public class CountExpression extends PropertyExpressionBase {
- private SiddhiTimeUnit timeUnit;
+ private PropertyExpression propertyExpression;
- public TimeWindowExpression(Integer timeWindowSize, SiddhiTimeUnit timeUnit) {
- super(timeWindowSize);
- this.timeUnit = timeUnit;
+ public CountExpression(PropertyExpression property) {
+ this.propertyExpression = property;
}
@Override
public String toSiddhiEpl() {
return join(SiddhiConstants.EMPTY,
- windowExpression(),
- windowValue(windowValue + SiddhiConstants.WHITESPACE + timeUnit.toTimeUnitString()));
+ "count",
+ SiddhiConstants.PARENTHESIS_OPEN,
+ propertyExpression.toSiddhiEpl(),
+ SiddhiConstants.PARENTHESIS_CLOSE);
}
}
diff --git a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
index 198b366..c4334bc 100644
--- a/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
+++ b/streampipes-wrapper-siddhi/src/main/java/org/apache/streampipes/wrapper/siddhi/query/expression/window/TimeWindowExpression.java
@@ -33,6 +33,7 @@ public class TimeWindowExpression extends WindowExpression {
public String toSiddhiEpl() {
return join(SiddhiConstants.EMPTY,
windowExpression(),
+ "time",
windowValue(windowValue + SiddhiConstants.WHITESPACE + timeUnit.toTimeUnitString()));
}
}