You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by wi...@apache.org on 2020/05/26 07:57:05 UTC

[incubator-streampipes-extensions] branch dev updated: [STREAMPIPES-143] Numerical Filter in Siddhi not working

This is an automated email from the ASF dual-hosted git repository.

wiener pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2aa7cc7  [STREAMPIPES-143] Numerical Filter in Siddhi not working
2aa7cc7 is described below

commit 2aa7cc73768c8f3ec99f076b44c66d7f8ade117b
Author: Patrick Wiener <wi...@fzi.de>
AuthorDate: Tue May 26 09:56:46 2020 +0200

    [STREAMPIPES-143] Numerical Filter in Siddhi not working
---
 .../processors/siddhi/FiltersSiddhiInit.java       |  4 +++-
 .../processors/siddhi/filter/NumericalFilter.java  | 21 +++++++++++++++++++-
 .../siddhi/filter/NumericalFilterController.java   |  4 +++-
 .../siddhi/filter/NumericalOperator.java           |  2 +-
 .../documentation.md                               | 23 +++++++++++++++-------
 .../strings.en                                     |  2 +-
 6 files changed, 44 insertions(+), 12 deletions(-)

diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java
index 036c31c..b49d270 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/FiltersSiddhiInit.java
@@ -27,6 +27,7 @@ import org.apache.streampipes.dataformat.smile.SmileDataFormatFactory;
 import org.apache.streampipes.messaging.jms.SpJmsProtocolFactory;
 import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
 import org.apache.streampipes.processors.siddhi.config.FilterSiddhiConfig;
+import org.apache.streampipes.processors.siddhi.filter.NumericalFilterController;
 import org.apache.streampipes.processors.siddhi.frequency.FrequencyController;
 import org.apache.streampipes.processors.siddhi.frequencychange.FrequencyChangeController;
 import org.apache.streampipes.processors.siddhi.stop.StreamStopController;
@@ -40,7 +41,8 @@ public class FiltersSiddhiInit extends StandaloneModelSubmitter {
             .add(new TrendController())
             .add(new StreamStopController())
             .add(new FrequencyController())
-            .add(new FrequencyChangeController());
+            .add(new FrequencyChangeController())
+            .add(new NumericalFilterController());
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
             new CborDataFormatFactory(),
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java
index 0203120..b78e9ae 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilter.java
@@ -26,7 +26,26 @@ public class NumericalFilter extends SiddhiEventEngine<NumericalFilterParameters
   @Override
   protected String fromStatement(List<String> inputStreamNames, NumericalFilterParameters params) {
     String filterProperty = prepareName(params.getFilterProperty());
-    return "from " + inputStreamNames.get(0) +"[" + filterProperty +"<" + params.getThreshold() +"]";
+    String filterOperator = "";
+
+    if (params.getNumericalOperator() == NumericalOperator.EQ) {
+      filterOperator = "==";
+    } else if (params.getNumericalOperator() == NumericalOperator.GE) {
+      filterOperator = ">=";
+    } else if (params.getNumericalOperator() == NumericalOperator.GT) {
+      filterOperator = ">";
+    } else if (params.getNumericalOperator() == NumericalOperator.LE) {
+      filterOperator = "<=";
+    } else if (params.getNumericalOperator() == NumericalOperator.LT) {
+      filterOperator = "<";
+    } else if (params.getNumericalOperator() == NumericalOperator.IE) {
+      filterOperator = "!=";
+    }
+
+    // e.g. Filter for numberField value less than 10 and output all fields
+    //
+    // Siddhi query: from inputstreamname[numberField<10]
+    return "from " + inputStreamNames.get(0) +"[" + filterProperty + filterOperator + params.getThreshold() +"]";
   }
 
   @Override
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java
index 8ab3705..530c32e 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalFilterController.java
@@ -47,7 +47,7 @@ public class NumericalFilterController extends StandaloneEventProcessingDeclarer
                             Labels.withId(NUMBER_MAPPING), PropertyScope.NONE).build())
             .outputStrategy(OutputStrategies.keep())
             .requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("<", "<=", ">",
-                    ">=", "=="))
+                    ">=", "==", "!="))
             .requiredFloatParameter(Labels.withId(VALUE), NUMBER_MAPPING)
             .build();
   }
@@ -68,6 +68,8 @@ public class NumericalFilterController extends StandaloneEventProcessingDeclarer
       operation = "GE";
     } else if (stringOperation.equals("==")) {
       operation = "EQ";
+    } else if (stringOperation.equals("!=")) {
+      operation = "IE";
     }
 
     String filterProperty = extractor.mappingPropertyValue(NUMBER_MAPPING);
diff --git a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java
index 52b97af..6f8338a 100644
--- a/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java
+++ b/streampipes-processors-filters-siddhi/src/main/java/org/apache/streampipes/processors/siddhi/filter/NumericalOperator.java
@@ -19,5 +19,5 @@
 package org.apache.streampipes.processors.siddhi.filter;
 
 public enum NumericalOperator {
-GE, GT, LE, LT, EQ
+GE, GT, LE, LT, EQ, IE
 }
diff --git a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md
index e3eacfa..5e9cdef 100644
--- a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md
+++ b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/documentation.md
@@ -25,24 +25,33 @@
 ***
 
 ## Description
+The Numerical Filter processor filters numerical values based on a given threshold. Therefore, it uses the lightweight
+CEP engine Siddhi by issuing a Siddhi query, e.g.
 
-
-Add a detailed description here
+```
+// filter query to filter out all events not satisfying the condition
+from inputStreamName[numberField<10]
+select *
+```
 
 ***
 
 ## Required input
-
+The processor works with any input event that has one field containing a numerical value.
 
 ***
 
 ## Configuration
 
-Describe the configuration parameters here
+### Field
+Specifies the field name where the filter operation should be applied on.
 
-### 1st parameter
 
+### Operation
+Specifies the filter operation that should be applied on the field.
 
-### 2nd parameter
+### Threshold value
+Specifies the threshold value.
 
-## Output
\ No newline at end of file
+## Output
+The processor outputs the input event if it satisfies the filter expression.
\ No newline at end of file
diff --git a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en
index 2b98572..693d946 100644
--- a/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en
+++ b/streampipes-processors-filters-siddhi/src/main/resources/org.apache.streampipes.processors.siddhi.numericalfilter/strings.en
@@ -1,5 +1,5 @@
 org.apache.streampipes.processors.siddhi.numericalfilter.title=Numerical Filter (Siddhi)
-org.apache.streampipes.processors.siddhi.numericalfilter.description=
+org.apache.streampipes.processors.siddhi.numericalfilter.description=Numerical Filter based on CEP engine Siddhi
 
 number-mapping.title=Field to Filter
 number-mapping.description=Specifies the field name where the filter operation should be applied on.