You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/05/26 07:57:05 UTC
[incubator-streampipes-extensions] branch dev updated:
[STREAMPIPES-143] Numerical Filter in Siddhi not working
This is an automated email from the ASF dual-hosted git repository.
wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 2aa7cc7 [STREAMPIPES-143] Numerical Filter in Siddhi not working
2aa7cc7 is described below
commit 2aa7cc73768c8f3ec99f076b44c66d7f8ade117b
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue May 26 09:56:46 2020 +0200
[STREAMPIPES-143] Numerical Filter in Siddhi not working
---
.../processors/siddhi/FiltersSiddhiInit.java | 4 +++-
.../processors/siddhi/filter/NumericalFilter.java | 21 +++++++++++++++++++-
.../siddhi/filter/NumericalFilterController.java | 4 +++-
.../siddhi/filter/NumericalOperator.java | 2 +-
.../documentation.md | 23 +++++++++++++++-------
.../strings.en | 2 +-
6 files changed, 44 insertions(+), 12 deletions(-)
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java
index 036c31c..b49d270 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.processors.siddhi.config.FilterSiddhiConfig;
+import org.apache.streampipes.processors.siddhi.filter.NumericalFilterController;
import org.apache.streampipes.processors.siddhi.frequency.FrequencyController;
import org.apache.streampipes.processors.siddhi.frequencychange.FrequencyChangeController;
import org.apache.streampipes.processors.siddhi.stop.StreamStopController;
@@ -40,7 +41,8 @@ public class FiltersSiddhiInit extends StandaloneModelSubmitter {
.add(new TrendController())
.add(new StreamStopController())
.add(new FrequencyController())
- .add(new FrequencyChangeController());
+ .add(new FrequencyChangeController())
+ .add(new NumericalFilterController());
DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java
index 0203120..b78e9ae 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java
@@ -26,7 +26,26 @@ public class NumericalFilter extends SiddhiEventEngine<NumericalFilterParameters
@Override
protected String fromStatement(List<String> inputStreamNames, NumericalFilterParameters params) {
String filterProperty = prepareName(params.getFilterProperty());
- return "from " + inputStreamNames.get(0) +"[" + filterProperty +"<" + params.getThreshold() +"]";
+ String filterOperator = "";
+
+ if (params.getNumericalOperator() == NumericalOperator.EQ) {
+ filterOperator = "==";
+ } else if (params.getNumericalOperator() == NumericalOperator.GE) {
+ filterOperator = ">=";
+ } else if (params.getNumericalOperator() == NumericalOperator.GT) {
+ filterOperator = ">";
+ } else if (params.getNumericalOperator() == NumericalOperator.LE) {
+ filterOperator = "<=";
+ } else if (params.getNumericalOperator() == NumericalOperator.LT) {
+ filterOperator = "<";
+ } else if (params.getNumericalOperator() == NumericalOperator.IE) {
+ filterOperator = "!=";
+ }
+
+ // e.g. Filter for numberField value less than 10 and output all fields
+ //
+ // Siddhi query: from inputstreamname[numberField<10]
+ return "from " + inputStreamNames.get(0) +"[" + filterProperty + filterOperator + params.getThreshold() +"]";
}
@Override
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java
index 8ab3705..530c32e 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java
@@ -47,7 +47,7 @@ public class NumericalFilterController extends StandaloneEventProcessingDeclarer
Labels.withId(NUMBER_MAPPING), PropertyScope.NONE).build())
.outputStrategy(OutputStrategies.keep())
.requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("<", "<=", ">",
- ">=", "=="))
+ ">=", "==", "!="))
.requiredFloatParameter(Labels.withId(VALUE), NUMBER_MAPPING)
.build();
}
@@ -68,6 +68,8 @@ public class NumericalFilterController extends StandaloneEventProcessingDeclarer
operation = "GE";
} else if (stringOperation.equals("==")) {
operation = "EQ";
+ } else if (stringOperation.equals("!=")) {
+ operation = "IE";
}
String filterProperty = extractor.mappingPropertyValue(NUMBER_MAPPING);
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java
index 52b97af..6f8338a 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java
@@ -19,5 +19,5 @@
package org.apache.streampipes.processors.siddhi.filter;
public enum NumericalOperator {
-GE, GT, LE, LT, EQ
+GE, GT, LE, LT, EQ, IE
}
diff --git a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md
index e3eacfa..5e9cdef 100644
--- a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md
+++ b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md
@@ -25,24 +25,33 @@
***
## Description
+The Numerical Filter processor filters numerical values based on a given threshold. Therefore, it uses the lightweight
+CEP engine Siddhi by issuing a Siddhi query, e.g.
-
-Add a detailed description here
+```
+// filter query to filter out all events not satisfying the condition
+from inputStreamName[numberField<10]
+select *
+```
***
## Required input
-
+The processor works with any input event that has one field containing a numerical value.
***
## Configuration
-Describe the configuration parameters here
+### Field
+Specifies the field name where the filter operation should be applied on.
-### 1st parameter
+### Operation
+Specifies the filter operation that should be applied on the field.
-### 2nd parameter
+### Threshold value
+Specifies the threshold value.
-## Output
\ No newline at end of file
+## Output
+The processor outputs the input event if it satisfies the filter expression.
\ No newline at end of file
diff --git a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en
index 2b98572..693d946 100644
--- a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en
+++ b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en
@@ -1,5 +1,5 @@
org.apache.streampipes.processors.siddhi.numericalfilter.title=Numerical Filter (Siddhi)
-org.apache.streampipes.processors.siddhi.numericalfilter.description=
+org.apache.streampipes.processors.siddhi.numericalfilter.description=Numerical Filter based on CEP engine Siddhi
number-mapping.title=Field to Filter
number-mapping.description=Specifies the field name where the filter operation should be applied on.