You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/11 20:02:33 UTC

[incubator-streampipes-extensions] branch dev updated: [STREAMPIPES-156] Processor: Number Labeler

This is an automated email from the ASF dual-hosted git repository.

zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git


The following commit(s) were added to refs/heads/dev by this push:
     new d0d1e89  [STREAMPIPES-156] Processor: Number Labeler
     new fac8dfd  Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
d0d1e89 is described below

commit d0d1e89c6e8a25ca0a9180f4e6df937b44000e70
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Jun 11 22:01:35 2020 +0200

    [STREAMPIPES-156] Processor: Number Labeler
---
 .../pe/jvm/AllPipelineElementsInit.java            |   6 +-
 .../transformation/jvm/TransformationJvmInit.java  |   6 +-
 .../StateBufferLabeler.java}                       |  57 ++------
 .../StateBufferLabelerController.java}             |  12 +-
 .../StateBufferLabelerParameters.java}             |  16 +--
 .../processor/state/labeler/model/Statement.java   |  54 --------
 .../state/labeler/model/StatementUtils.java        | 144 +++++++++++++++++++++
 .../state/labeler/number/NumberLabeler.java        |  66 ++++++++++
 .../NumberLabelerController.java}                  |  31 +----
 .../NumberLabelerParameters.java}                  |  42 +-----
 .../documentation.md                               |   2 +-
 .../icon.png                                       | Bin 0 -> 10162 bytes
 .../strings.en                                     |   4 +-
 .../documentation.md                               |  21 +--
 .../icon.png                                       | Bin 0 -> 8629 bytes
 .../strings.en                                     |  14 ++
 .../icon.png                                       | Bin 9118 -> 0 bytes
 17 files changed, 276 insertions(+), 199 deletions(-)

diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index dd14fa1..a8d5306 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -69,7 +69,8 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
 import org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.StateBufferController;
-import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.StateLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.StateBufferLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -155,7 +156,7 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new BooleanTimekeepingController())
             .add(new BooleanTimerController())
             .add(new StateBufferController())
-            .add(new StateLabelerController())
+            .add(new StateBufferLabelerController())
             .add(new SignalEdgeFilterController())
             .add(new BooleanToStateController())
             .add(new CsvMetadataEnrichmentController())
@@ -164,6 +165,7 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
             .add(new TransformToBooleanController())
             .add(new StringCounterController())
             .add(new StringTimerController())
+            .add(new NumberLabelerController())
             // streampipes-sinks-brokers-jvm
             .add(new KafkaController())
             .add(new JmsController())
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 792468a..f8045e0 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -37,7 +37,8 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
 import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
 import org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.StateBufferController;
-import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.StateLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.StateBufferLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
 import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
 import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -68,7 +69,8 @@ public class TransformationJvmInit extends StandaloneModelSubmitter {
             .add(new SignalEdgeFilterController())
             .add(new BooleanToStateController())
             .add(new StateBufferController())
-            .add(new StateLabelerController())
+            .add(new StateBufferLabelerController())
+            .add(new NumberLabelerController())
             .add(new StringCounterController());
 
     DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
similarity index 61%
rename from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
rename to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
index 3bdc3c7..a5454b5 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
@@ -16,13 +16,14 @@
  *
  */
 
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
 
 import com.google.common.math.Stats;
 import org.apache.streampipes.commons.exceptions.SpRuntimeException;
 import org.apache.streampipes.logging.api.Logger;
 import org.apache.streampipes.model.runtime.Event;
 import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.Statement;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.StatementUtils;
 import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
 import org.apache.streampipes.wrapper.routing.SpOutputCollector;
 import org.apache.streampipes.wrapper.runtime.EventProcessor;
@@ -31,7 +32,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-public class StateLabeler implements EventProcessor<StateLabelerParameters> {
+public class StateBufferLabeler implements EventProcessor<StateBufferLabelerParameters> {
 
   private static Logger LOG;
   private String sensorListValueProperty;
@@ -41,27 +42,17 @@ public class StateLabeler implements EventProcessor<StateLabelerParameters> {
   private List<Statement> statements;
 
   @Override
-  public void onInvocation(StateLabelerParameters stateBufferParameters,
+  public void onInvocation(StateBufferLabelerParameters stateBufferParameters,
                            SpOutputCollector spOutputCollector,
                            EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
-    LOG = stateBufferParameters.getGraph().getLogger(StateLabeler.class);
+    LOG = stateBufferParameters.getGraph().getLogger(StateBufferLabeler.class);
 
     this.sensorListValueProperty = stateBufferParameters.getSensorListValueProperty();
     this.stateProperty = stateBufferParameters.getStateProperty();
     this.stateFilter = stateBufferParameters.getStateFilter();
     this.selectedOperation = stateBufferParameters.getSelectedOperation();
 
-    this.statements = new ArrayList<>();
-
-    for (String s : stateBufferParameters.getStatementsStrings()) {
-      Statement statement = Statement.getStatement(s);
-      if (statement == null) {
-        throw new SpRuntimeException("Statement: " + s + " is not correctly formatted");
-      }
-      this.statements.add(statement);
-    }
-    Collections.reverse(this.statements);
-
+    this.statements = StatementUtils.getStatements(stateBufferParameters.getStatementsStrings());
   }
 
   @Override
@@ -73,48 +64,20 @@ public class StateLabeler implements EventProcessor<StateLabelerParameters> {
     if (states.contains(this.stateFilter) || this.stateFilter.equals("*"))  {
       double calculatedValue;
 
-      if (StateLabelerController.MAXIMUM.equals(this.selectedOperation)) {
+      if (StateBufferLabelerController.MAXIMUM.equals(this.selectedOperation)) {
         calculatedValue = Stats.of(values).max();
-      } else if (StateLabelerController.MINIMUM.equals(this.selectedOperation)) {
+      } else if (StateBufferLabelerController.MINIMUM.equals(this.selectedOperation)) {
         calculatedValue = Stats.of(values).min();
       } else {
         calculatedValue = Stats.of(values).mean();
       }
 
-      String label = getLabel(calculatedValue);
-      if (label != null) {
-        inputEvent.addField(StateLabelerController.LABEL, label);
-        out.collect(inputEvent);
-      } else {
-        LOG.info("No condition of statements was fulfilled, add a default case (*) to the statements");
-      }
-
+      Event resultEvent = StatementUtils.addLabel(inputEvent, calculatedValue, this.statements, LOG);
+      out.collect(resultEvent);
     }
   }
 
   @Override
   public void onDetach() {
   }
-
-  private String getLabel(double calculatedValue) {
-    for (Statement statement : this.statements) {
-      if (condition(statement, calculatedValue)) {
-        return statement.getLabel();
-      }
-    }
-    return null;
-  }
-
-  private boolean condition(Statement statement, double calculatedValue) {
-    if (">".equals(statement.getOperator())) {
-      return calculatedValue > statement.getValue();
-    } else if ("<".equals(statement.getOperator())) {
-      return calculatedValue < statement.getValue();
-    } else if ("=".equals(statement.getOperator())) {
-      return calculatedValue == statement.getValue();
-    } else {
-      return true;
-    }
-
-  }
 }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
similarity index 87%
copy from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
copy to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
index f367e61..fa19e8a 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
 
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -33,7 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.List;
 
-public class StateLabelerController extends StandaloneEventProcessingDeclarer<StateLabelerParameters> {
+public class StateBufferLabelerController extends StandaloneEventProcessingDeclarer<StateBufferLabelerParameters> {
 
   public static final String STATE_FILTER_ID = "stateFilterId";
   public static final String STATE_FIELD_ID = "stateFieldId";
@@ -50,7 +50,7 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
 
   @Override
   public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler")
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer")
             .withLocales(Locales.EN)
             .withAssets(Assets.DOCUMENTATION, Assets.ICON)
             .requiredStream(StreamRequirementsBuilder.create()
@@ -83,7 +83,7 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
   }
 
   @Override
-  public ConfiguredEventProcessor<StateLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+  public ConfiguredEventProcessor<StateBufferLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
 
     String sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
     String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
@@ -92,8 +92,8 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
 
     List<String> statementStrings = extractor.singleValueParameterFromCollection(LABEL_COLLECTION_ID, String.class);
 
-    StateLabelerParameters params = new StateLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
+    StateBufferLabelerParameters params = new StateBufferLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
 
-    return new ConfiguredEventProcessor<>(params, StateLabeler::new);
+    return new ConfiguredEventProcessor<>(params, StateBufferLabeler::new);
   }
 }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
similarity index 83%
copy from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
copy to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
index 885a722..84428ab 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
@@ -16,14 +16,14 @@
  *
  */
 
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
 
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 import java.util.List;
 
-public class StateLabelerParameters extends EventProcessorBindingParams {
+public class StateBufferLabelerParameters extends EventProcessorBindingParams {
 
     private String sensorListValueProperty;
     private String stateProperty;
@@ -31,12 +31,12 @@ public class StateLabelerParameters extends EventProcessorBindingParams {
     private String selectedOperation;
     private List<String> statementsStrings;
 
-    public StateLabelerParameters(DataProcessorInvocation graph,
-                                  String sensorListValueProperty,
-                                  String stateProperty,
-                                  String stateFilter,
-                                  String selectedOperation,
-                                  List<String> statementsStrings) {
+    public StateBufferLabelerParameters(DataProcessorInvocation graph,
+                                        String sensorListValueProperty,
+                                        String stateProperty,
+                                        String stateFilter,
+                                        String selectedOperation,
+                                        List<String> statementsStrings) {
         super(graph);
         this.sensorListValueProperty = sensorListValueProperty;
         this.stateProperty = stateProperty;
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
index 42ae687..2a6a47d 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
@@ -24,60 +24,6 @@ public class Statement {
     public Statement() {
     }
 
-    /**
-     * This method checks if the user input is correct. When not null is returned
-     * @param s
-     * @return
-     */
-    public static Statement getStatement(String s) {
-        Statement result = new Statement();
-
-        String[] parts  = s.split(";");
-        // default case
-        if (parts.length == 2) {
-            if (parts[0].equals("*")) {
-                result.setOperator(parts[0]);
-                result.setLabel(parts[1]);
-                return result;
-            } else {
-                return null;
-            }
-        }
-
-        // all other valid cases
-        if (parts.length ==  3) {
-
-            if (parts[0].equals(">") || parts[0].equals("<") || parts[0].equals("=")) {
-                result.setOperator(parts[0]);
-            } else {
-                return null;
-            }
-
-            if (isNumeric(parts[1].replaceAll("-", ""))) {
-                result.setValue(Double.parseDouble(parts[1]));
-            } else {
-                return null;
-            }
-
-            result.setLabel(parts[2]);
-
-            return result;
-        } else {
-            return null;
-        }
-    }
-
-    private static boolean isNumeric(final String str) {
-
-        // null or empty
-        if (str == null || str.length() == 0) {
-            return false;
-        }
-
-        return str.chars().allMatch(Character::isDigit);
-
-    }
-
     public String getOperator() {
         return operator;
     }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
new file mode 100644
index 0000000..40840b1
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
@@ -0,0 +1,144 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StatementUtils {
+
+    /**
+     * Add a label to the input event according to the provided statements
+     * @param inputEvent
+     * @param value
+     * @param statements
+     * @param log
+     * @return
+     * @throws SpRuntimeException
+     */
+    public static Event addLabel(Event inputEvent, double value, List<Statement> statements, Logger log) {
+        String label = getLabel(value, statements);
+        if (label != null) {
+            inputEvent.addField(NumberLabelerController.LABEL, label);
+        } else {
+            log.info("No condition of statements was fulfilled, add a default case (*) to the statements");
+        }
+
+        return inputEvent;
+    }
+
+
+    /**
+     * Extracts Statements from Strings
+     * @param statementStrings
+     * @return
+     * @throws SpRuntimeException
+     */
+    public static List<Statement> getStatements(List<String> statementStrings) throws SpRuntimeException {
+        List<Statement> statements = new ArrayList<>();
+
+        for (String s : statementStrings) {
+            Statement statement = getStatement(s);
+            if (statement == null) {
+                throw new SpRuntimeException("Statement: " + s + " is not correctly formatted");
+            }
+            statements.add(statement);
+        }
+
+        Collections.reverse(statements);
+        return statements;
+    }
+
+    /**
+     * This method checks if the user input is correct. When not null is returned
+     * @param s
+     * @return
+     */
+    public static Statement getStatement(String s) {
+        Statement result = new Statement();
+
+        String[] parts  = s.split(";");
+        // default case
+        if (parts.length == 2) {
+            if (parts[0].equals("*")) {
+                result.setOperator(parts[0]);
+                result.setLabel(parts[1]);
+                return result;
+            } else {
+                return null;
+            }
+        }
+
+        // all other valid cases
+        if (parts.length ==  3) {
+
+            if (parts[0].equals(">") || parts[0].equals("<") || parts[0].equals("=")) {
+                result.setOperator(parts[0]);
+            } else {
+                return null;
+            }
+
+            if (isNumeric(parts[1].replaceAll("-", ""))) {
+                result.setValue(Double.parseDouble(parts[1]));
+            } else {
+                return null;
+            }
+
+            result.setLabel(parts[2]);
+
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    private static String getLabel(double calculatedValue, List<Statement> statements) {
+        for (Statement statement : statements) {
+            if (condition(statement, calculatedValue)) {
+                return statement.getLabel();
+            }
+        }
+        return null;
+    }
+
+    private static boolean condition(Statement statement, double calculatedValue) {
+        if (">".equals(statement.getOperator())) {
+            return calculatedValue > statement.getValue();
+        } else if ("<".equals(statement.getOperator())) {
+            return calculatedValue < statement.getValue();
+        } else if ("=".equals(statement.getOperator())) {
+            return calculatedValue == statement.getValue();
+        } else {
+            return true;
+        }
+    }
+
+    private static boolean isNumeric(final String str) {
+
+        // null or empty
+        if (str == null || str.length() == 0) {
+            return false;
+        }
+        return str.chars().allMatch(Character::isDigit);
+    }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabeler.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabeler.java
new file mode 100644
index 0000000..34feca1
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabeler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.Statement;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.StatementUtils;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class NumberLabeler implements EventProcessor<NumberLabelerParameters> {
+
+  private static Logger LOG;
+  private String sensorListValueProperty;
+  private List<Statement> statements;
+
+  @Override
+  public void onInvocation(NumberLabelerParameters stateBufferParameters,
+                           SpOutputCollector spOutputCollector,
+                           EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+    LOG = stateBufferParameters.getGraph().getLogger(NumberLabeler.class);
+
+    this.sensorListValueProperty = stateBufferParameters.getSensorListValueProperty();
+
+    this.statements = StatementUtils.getStatements(stateBufferParameters.getStatementsStrings());
+  }
+
+  @Override
+  public void onEvent(Event inputEvent, SpOutputCollector out) {
+
+    Double value = inputEvent.getFieldBySelector(this.sensorListValueProperty).getAsPrimitive().getAsDouble();
+
+    Event resultEvent = StatementUtils.addLabel(inputEvent, value, this.statements, LOG);
+
+    out.collect(resultEvent);
+  }
+
+  @Override
+  public void onDetach() {
+  }
+
+
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerController.java
similarity index 67%
rename from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
rename to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerController.java
index f367e61..5f77127 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerController.java
@@ -16,7 +16,7 @@
  *
  */
 
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number;
 
 import org.apache.streampipes.model.graph.DataProcessorDescription;
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -33,39 +33,25 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
 
 import java.util.List;
 
-public class StateLabelerController extends StandaloneEventProcessingDeclarer<StateLabelerParameters> {
+public class NumberLabelerController extends StandaloneEventProcessingDeclarer<NumberLabelerParameters> {
 
-  public static final String STATE_FILTER_ID = "stateFilterId";
-  public static final String STATE_FIELD_ID = "stateFieldId";
-  public static final String OPERATIONS_ID = "operationsId";
   public static final String SENSOR_VALUE_ID = "sensorValueId";
   public static final String LABEL_COLLECTION_ID = "labelCollectionId";
   public static final String LABEL_STRING_ID = "labelStringId";
 
   public static final String LABEL = "label";
 
-  public static final String MINIMUM = "minimum";
-  public static final String MAXIMUM = "maximum";
-  public static final String AVERAGE = "average";
-
   @Override
   public DataProcessorDescription declareModel() {
-    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler")
+    return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number")
             .withLocales(Locales.EN)
             .withAssets(Assets.DOCUMENTATION, Assets.ICON)
             .requiredStream(StreamRequirementsBuilder.create()
                     .requiredPropertyWithUnaryMapping(
-                            EpRequirements.listRequirement(),
+                            EpRequirements.numberReq(),
                             Labels.withId(SENSOR_VALUE_ID),
                             PropertyScope.NONE)
-                    .requiredPropertyWithUnaryMapping(
-                            EpRequirements.domainPropertyReqList(SPSensor.STATE),
-                            Labels.withId(STATE_FIELD_ID),
-                            PropertyScope.NONE)
                     .build())
-            .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
-            .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
-                    Options.from(MINIMUM, MAXIMUM, AVERAGE))
             .requiredParameterAsCollection(
                     Labels.withId(LABEL_COLLECTION_ID),
                       StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID)))
@@ -83,17 +69,14 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
   }
 
   @Override
-  public ConfiguredEventProcessor<StateLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+  public ConfiguredEventProcessor<NumberLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
 
     String sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
-    String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
-    String stateFilter = extractor.singleValueParameter(STATE_FILTER_ID, String.class);
-    String selectedOperation = extractor.selectedSingleValue(OPERATIONS_ID, String.class);
 
     List<String> statementStrings = extractor.singleValueParameterFromCollection(LABEL_COLLECTION_ID, String.class);
 
-    StateLabelerParameters params = new StateLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
+    NumberLabelerParameters params = new NumberLabelerParameters(graph, sensorListValueProperty, statementStrings);
 
-    return new ConfiguredEventProcessor<>(params, StateLabeler::new);
+    return new ConfiguredEventProcessor<>(params, NumberLabeler::new);
   }
 }
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerParameters.java
similarity index 57%
rename from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
rename to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerParameters.java
index 885a722..5063bb8 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerParameters.java
@@ -16,32 +16,23 @@
  *
  */
 
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number;
 
 import org.apache.streampipes.model.graph.DataProcessorInvocation;
 import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
 
 import java.util.List;
 
-public class StateLabelerParameters extends EventProcessorBindingParams {
+public class NumberLabelerParameters extends EventProcessorBindingParams {
 
     private String sensorListValueProperty;
-    private String stateProperty;
-    private String stateFilter;
-    private String selectedOperation;
     private List<String> statementsStrings;
 
-    public StateLabelerParameters(DataProcessorInvocation graph,
-                                  String sensorListValueProperty,
-                                  String stateProperty,
-                                  String stateFilter,
-                                  String selectedOperation,
-                                  List<String> statementsStrings) {
+    public NumberLabelerParameters(DataProcessorInvocation graph,
+                                   String sensorListValueProperty,
+                                   List<String> statementsStrings) {
         super(graph);
         this.sensorListValueProperty = sensorListValueProperty;
-        this.stateProperty = stateProperty;
-        this.stateFilter = stateFilter;
-        this.selectedOperation = selectedOperation;
         this.statementsStrings = statementsStrings;
     }
 
@@ -53,22 +44,6 @@ public class StateLabelerParameters extends EventProcessorBindingParams {
         this.sensorListValueProperty = sensorListValueProperty;
     }
 
-    public String getStateFilter() {
-        return stateFilter;
-    }
-
-    public void setStateFilter(String stateFilter) {
-        this.stateFilter = stateFilter;
-    }
-
-    public String getSelectedOperation() {
-        return selectedOperation;
-    }
-
-    public void setSelectedOperation(String selectedOperation) {
-        this.selectedOperation = selectedOperation;
-    }
-
     public List<String> getStatementsStrings() {
         return statementsStrings;
     }
@@ -77,11 +52,4 @@ public class StateLabelerParameters extends EventProcessorBindingParams {
         this.statementsStrings = statementsStrings;
     }
 
-    public String getStateProperty() {
-        return stateProperty;
-    }
-
-    public void setStateProperty(String stateProperty) {
-        this.stateProperty = stateProperty;
-    }
 }
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/documentation.md
similarity index 98%
copy from streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
copy to streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/documentation.md
index ffe7545..6f85882 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## State Labeler
+## State Buffer Labeler
 
 <p align="center"> 
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/icon.png
new file mode 100644
index 0000000..034fb5b
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/strings.en
similarity index 87%
rename from streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en
rename to streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/strings.en
index 5b77578..62ac488 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.title=State Labeler
-org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.description=Adds a label based on a  user defined rule to a sensor time-series
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.title=State Buffer Labeler
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.description=Adds a label based on a  user defined rule to a sensor time-series
 
 stateFilterId.title=Select a specific state
 stateFilterId.description=Add a filter to define which states to evaluate. Add '*' to select all states.
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/documentation.md
similarity index 66%
rename from streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
rename to streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/documentation.md
index ffe7545..20bd156 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/documentation.md
@@ -16,7 +16,7 @@
   ~
   -->
 
-## State Labeler
+## Number Labeler
 
 <p align="center"> 
     <img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,33 +26,22 @@
 
 ## Description
 
-Apply a rule to a time-series recorded during a state of a machine. (E.g. when minimum value is lower then 10, add label `not ok` else add label `ok`)
-
+Apply a rule to a value of a field. (E.g. when minimum value is lower then 10, add label `not ok` else add label `ok`)
 
 ***
 
 ## Required input
 
-Requires a list with sensor values and a field defining the state
-
-### Sensor values
-
-An array representing sensor values recorded during the state.
+Requires a sensor value
 
-### State field
+### Sensor value
 
-A field representing the state when the sensor values where recorded.
+A number representing the current sensor value.
 
 ***
 
 ## Configuration
 
-### Select a specific state
-When you are interested in the values of a specific state add it here. All other states will be ignored. To get results of all states enter `*`
-
-### Operation
-Operation that will be performed on the sensor values (calculate `maximim`, or `average`, or `minimum`) 
-
 ### Condition
 Define a rule which label to add. Example: `<;5;nok` means when the calculated value is smaller then 5 add label ok.
 The default label can be defined with `*;nok`.
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/icon.png
new file mode 100644
index 0000000..9fc17e6
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/strings.en
new file mode 100644
index 0000000..31c6151
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/strings.en
@@ -0,0 +1,14 @@
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.title=Number Labeler
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.description=Adds a label based on a  user defined rule to a sensor time-series
+
+sensorValueId.title=Sensor value
+sensorValueId.description=The value used for the labeling
+
+labelCollectionId.title=Condition
+labelCollectionId.description=Add a condition with the following scheme (Order is important) '<;5;ok', '<;10;ok' or '*;nok' as a default label
+
+label.title=Label
+label.description=User defined label for data
+
+labelStringId.title=Condition
+labelStringId.description=Add a condition with the following scheme (Order is important) '<;5;ok', '<;10;ok' or '*;nok' as a default label
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png
deleted file mode 100644
index b323e75..0000000
Binary files a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png and /dev/null differ