You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/06/11 20:02:33 UTC
[incubator-streampipes-extensions] branch dev updated:
[STREAMPIPES-156] Processor: Number Labeler
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new d0d1e89 [STREAMPIPES-156] Processor: Number Labeler
new fac8dfd Merge branch 'dev' of github.com:apache/incubator-streampipes-extensions into dev
d0d1e89 is described below
commit d0d1e89c6e8a25ca0a9180f4e6df937b44000e70
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Thu Jun 11 22:01:35 2020 +0200
[STREAMPIPES-156] Processor: Number Labeler
---
.../pe/jvm/AllPipelineElementsInit.java | 6 +-
.../transformation/jvm/TransformationJvmInit.java | 6 +-
.../StateBufferLabeler.java} | 57 ++------
.../StateBufferLabelerController.java} | 12 +-
.../StateBufferLabelerParameters.java} | 16 +--
.../processor/state/labeler/model/Statement.java | 54 --------
.../state/labeler/model/StatementUtils.java | 144 +++++++++++++++++++++
.../state/labeler/number/NumberLabeler.java | 66 ++++++++++
.../NumberLabelerController.java} | 31 +----
.../NumberLabelerParameters.java} | 42 +-----
.../documentation.md | 2 +-
.../icon.png | Bin 0 -> 10162 bytes
.../strings.en | 4 +-
.../documentation.md | 21 +--
.../icon.png | Bin 0 -> 8629 bytes
.../strings.en | 14 ++
.../icon.png | Bin 9118 -> 0 bytes
17 files changed, 276 insertions(+), 199 deletions(-)
diff --git a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
index dd14fa1..a8d5306 100644
--- a/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
+++ b/streampipes-pipeline-elements-all-jvm/src/main/java/org/apache/streampipes/pe/jvm/AllPipelineElementsInit.java
@@ -69,7 +69,8 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
import org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.StateBufferController;
-import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.StateLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.StateBufferLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -155,7 +156,7 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
.add(new BooleanTimekeepingController())
.add(new BooleanTimerController())
.add(new StateBufferController())
- .add(new StateLabelerController())
+ .add(new StateBufferLabelerController())
.add(new SignalEdgeFilterController())
.add(new BooleanToStateController())
.add(new CsvMetadataEnrichmentController())
@@ -164,6 +165,7 @@ public class AllPipelineElementsInit extends StandaloneModelSubmitter {
.add(new TransformToBooleanController())
.add(new StringCounterController())
.add(new StringTimerController())
+ .add(new NumberLabelerController())
// streampipes-sinks-brokers-jvm
.add(new KafkaController())
.add(new JmsController())
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
index 792468a..f8045e0 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/TransformationJvmInit.java
@@ -37,7 +37,8 @@ import org.apache.streampipes.processors.transformation.jvm.processor.booloperat
import org.apache.streampipes.processors.transformation.jvm.processor.booloperator.timer.BooleanTimerController;
import org.apache.streampipes.processors.transformation.jvm.processor.csvmetadata.CsvMetadataEnrichmentController;
import org.apache.streampipes.processors.transformation.jvm.processor.state.buffer.StateBufferController;
-import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.StateLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.StateBufferLabelerController;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.counter.StringCounterController;
import org.apache.streampipes.processors.transformation.jvm.processor.stringoperator.timer.StringTimerController;
import org.apache.streampipes.processors.transformation.jvm.processor.task.TaskDurationController;
@@ -68,7 +69,8 @@ public class TransformationJvmInit extends StandaloneModelSubmitter {
.add(new SignalEdgeFilterController())
.add(new BooleanToStateController())
.add(new StateBufferController())
- .add(new StateLabelerController())
+ .add(new StateBufferLabelerController())
+ .add(new NumberLabelerController())
.add(new StringCounterController());
DeclarersSingleton.getInstance().registerDataFormats(new JsonDataFormatFactory(),
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
similarity index 61%
rename from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
rename to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
index 3bdc3c7..a5454b5 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabeler.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
@@ -16,13 +16,14 @@
*
*/
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
import com.google.common.math.Stats;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.logging.api.Logger;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.Statement;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.StatementUtils;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.runtime.EventProcessor;
@@ -31,7 +32,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-public class StateLabeler implements EventProcessor<StateLabelerParameters> {
+public class StateBufferLabeler implements EventProcessor<StateBufferLabelerParameters> {
private static Logger LOG;
private String sensorListValueProperty;
@@ -41,27 +42,17 @@ public class StateLabeler implements EventProcessor<StateLabelerParameters> {
private List<Statement> statements;
@Override
- public void onInvocation(StateLabelerParameters stateBufferParameters,
+ public void onInvocation(StateBufferLabelerParameters stateBufferParameters,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
- LOG = stateBufferParameters.getGraph().getLogger(StateLabeler.class);
+ LOG = stateBufferParameters.getGraph().getLogger(StateBufferLabeler.class);
this.sensorListValueProperty = stateBufferParameters.getSensorListValueProperty();
this.stateProperty = stateBufferParameters.getStateProperty();
this.stateFilter = stateBufferParameters.getStateFilter();
this.selectedOperation = stateBufferParameters.getSelectedOperation();
- this.statements = new ArrayList<>();
-
- for (String s : stateBufferParameters.getStatementsStrings()) {
- Statement statement = Statement.getStatement(s);
- if (statement == null) {
- throw new SpRuntimeException("Statement: " + s + " is not correctly formatted");
- }
- this.statements.add(statement);
- }
- Collections.reverse(this.statements);
-
+ this.statements = StatementUtils.getStatements(stateBufferParameters.getStatementsStrings());
}
@Override
@@ -73,48 +64,20 @@ public class StateLabeler implements EventProcessor<StateLabelerParameters> {
if (states.contains(this.stateFilter) || this.stateFilter.equals("*")) {
double calculatedValue;
- if (StateLabelerController.MAXIMUM.equals(this.selectedOperation)) {
+ if (StateBufferLabelerController.MAXIMUM.equals(this.selectedOperation)) {
calculatedValue = Stats.of(values).max();
- } else if (StateLabelerController.MINIMUM.equals(this.selectedOperation)) {
+ } else if (StateBufferLabelerController.MINIMUM.equals(this.selectedOperation)) {
calculatedValue = Stats.of(values).min();
} else {
calculatedValue = Stats.of(values).mean();
}
- String label = getLabel(calculatedValue);
- if (label != null) {
- inputEvent.addField(StateLabelerController.LABEL, label);
- out.collect(inputEvent);
- } else {
- LOG.info("No condition of statements was fulfilled, add a default case (*) to the statements");
- }
-
+ Event resultEvent = StatementUtils.addLabel(inputEvent, calculatedValue, this.statements, LOG);
+ out.collect(resultEvent);
}
}
@Override
public void onDetach() {
}
-
- private String getLabel(double calculatedValue) {
- for (Statement statement : this.statements) {
- if (condition(statement, calculatedValue)) {
- return statement.getLabel();
- }
- }
- return null;
- }
-
- private boolean condition(Statement statement, double calculatedValue) {
- if (">".equals(statement.getOperator())) {
- return calculatedValue > statement.getValue();
- } else if ("<".equals(statement.getOperator())) {
- return calculatedValue < statement.getValue();
- } else if ("=".equals(statement.getOperator())) {
- return calculatedValue == statement.getValue();
- } else {
- return true;
- }
-
- }
}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
similarity index 87%
copy from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
copy to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
index f367e61..fa19e8a 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -33,7 +33,7 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
import java.util.List;
-public class StateLabelerController extends StandaloneEventProcessingDeclarer<StateLabelerParameters> {
+public class StateBufferLabelerController extends StandaloneEventProcessingDeclarer<StateBufferLabelerParameters> {
public static final String STATE_FILTER_ID = "stateFilterId";
public static final String STATE_FIELD_ID = "stateFieldId";
@@ -50,7 +50,7 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
@Override
public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler")
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer")
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create()
@@ -83,7 +83,7 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
}
@Override
- public ConfiguredEventProcessor<StateLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public ConfiguredEventProcessor<StateBufferLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
String sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
@@ -92,8 +92,8 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
List<String> statementStrings = extractor.singleValueParameterFromCollection(LABEL_COLLECTION_ID, String.class);
- StateLabelerParameters params = new StateLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
+ StateBufferLabelerParameters params = new StateBufferLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
- return new ConfiguredEventProcessor<>(params, StateLabeler::new);
+ return new ConfiguredEventProcessor<>(params, StateBufferLabeler::new);
}
}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
similarity index 83%
copy from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
copy to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
index 885a722..84428ab 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
@@ -16,14 +16,14 @@
*
*/
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import java.util.List;
-public class StateLabelerParameters extends EventProcessorBindingParams {
+public class StateBufferLabelerParameters extends EventProcessorBindingParams {
private String sensorListValueProperty;
private String stateProperty;
@@ -31,12 +31,12 @@ public class StateLabelerParameters extends EventProcessorBindingParams {
private String selectedOperation;
private List<String> statementsStrings;
- public StateLabelerParameters(DataProcessorInvocation graph,
- String sensorListValueProperty,
- String stateProperty,
- String stateFilter,
- String selectedOperation,
- List<String> statementsStrings) {
+ public StateBufferLabelerParameters(DataProcessorInvocation graph,
+ String sensorListValueProperty,
+ String stateProperty,
+ String stateFilter,
+ String selectedOperation,
+ List<String> statementsStrings) {
super(graph);
this.sensorListValueProperty = sensorListValueProperty;
this.stateProperty = stateProperty;
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
index 42ae687..2a6a47d 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/Statement.java
@@ -24,60 +24,6 @@ public class Statement {
public Statement() {
}
- /**
- * This method checks if the user input is correct. When not null is returned
- * @param s
- * @return
- */
- public static Statement getStatement(String s) {
- Statement result = new Statement();
-
- String[] parts = s.split(";");
- // default case
- if (parts.length == 2) {
- if (parts[0].equals("*")) {
- result.setOperator(parts[0]);
- result.setLabel(parts[1]);
- return result;
- } else {
- return null;
- }
- }
-
- // all other valid cases
- if (parts.length == 3) {
-
- if (parts[0].equals(">") || parts[0].equals("<") || parts[0].equals("=")) {
- result.setOperator(parts[0]);
- } else {
- return null;
- }
-
- if (isNumeric(parts[1].replaceAll("-", ""))) {
- result.setValue(Double.parseDouble(parts[1]));
- } else {
- return null;
- }
-
- result.setLabel(parts[2]);
-
- return result;
- } else {
- return null;
- }
- }
-
- private static boolean isNumeric(final String str) {
-
- // null or empty
- if (str == null || str.length() == 0) {
- return false;
- }
-
- return str.chars().allMatch(Character::isDigit);
-
- }
-
public String getOperator() {
return operator;
}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
new file mode 100644
index 0000000..40840b1
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
@@ -0,0 +1,144 @@
+/*
+Copyright 2020 FZI Forschungszentrum Informatik
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.NumberLabelerController;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StatementUtils {
+
+ /**
+ * Add a label to the input event according to the provided statements
+ * @param inputEvent
+ * @param value
+ * @param statements
+ * @param log
+ * @return
+ * @throws SpRuntimeException
+ */
+ public static Event addLabel(Event inputEvent, double value, List<Statement> statements, Logger log) {
+ String label = getLabel(value, statements);
+ if (label != null) {
+ inputEvent.addField(NumberLabelerController.LABEL, label);
+ } else {
+ log.info("No condition of statements was fulfilled, add a default case (*) to the statements");
+ }
+
+ return inputEvent;
+ }
+
+
+ /**
+ * Extracts Statements from Strings
+ * @param statementStrings
+ * @return
+ * @throws SpRuntimeException
+ */
+ public static List<Statement> getStatements(List<String> statementStrings) throws SpRuntimeException {
+ List<Statement> statements = new ArrayList<>();
+
+ for (String s : statementStrings) {
+ Statement statement = getStatement(s);
+ if (statement == null) {
+ throw new SpRuntimeException("Statement: " + s + " is not correctly formatted");
+ }
+ statements.add(statement);
+ }
+
+ Collections.reverse(statements);
+ return statements;
+ }
+
+ /**
+ * This method checks if the user input is correct. When not null is returned
+ * @param s
+ * @return
+ */
+ public static Statement getStatement(String s) {
+ Statement result = new Statement();
+
+ String[] parts = s.split(";");
+ // default case
+ if (parts.length == 2) {
+ if (parts[0].equals("*")) {
+ result.setOperator(parts[0]);
+ result.setLabel(parts[1]);
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ // all other valid cases
+ if (parts.length == 3) {
+
+ if (parts[0].equals(">") || parts[0].equals("<") || parts[0].equals("=")) {
+ result.setOperator(parts[0]);
+ } else {
+ return null;
+ }
+
+ if (isNumeric(parts[1].replaceAll("-", ""))) {
+ result.setValue(Double.parseDouble(parts[1]));
+ } else {
+ return null;
+ }
+
+ result.setLabel(parts[2]);
+
+ return result;
+ } else {
+ return null;
+ }
+ }
+
+ private static String getLabel(double calculatedValue, List<Statement> statements) {
+ for (Statement statement : statements) {
+ if (condition(statement, calculatedValue)) {
+ return statement.getLabel();
+ }
+ }
+ return null;
+ }
+
+ private static boolean condition(Statement statement, double calculatedValue) {
+ if (">".equals(statement.getOperator())) {
+ return calculatedValue > statement.getValue();
+ } else if ("<".equals(statement.getOperator())) {
+ return calculatedValue < statement.getValue();
+ } else if ("=".equals(statement.getOperator())) {
+ return calculatedValue == statement.getValue();
+ } else {
+ return true;
+ }
+ }
+
+ private static boolean isNumeric(final String str) {
+
+ // null or empty
+ if (str == null || str.length() == 0) {
+ return false;
+ }
+ return str.chars().allMatch(Character::isDigit);
+ }
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabeler.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabeler.java
new file mode 100644
index 0000000..34feca1
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabeler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number;
+
+import org.apache.streampipes.commons.exceptions.SpRuntimeException;
+import org.apache.streampipes.logging.api.Logger;
+import org.apache.streampipes.model.runtime.Event;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.Statement;
+import org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.model.StatementUtils;
+import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
+import org.apache.streampipes.wrapper.routing.SpOutputCollector;
+import org.apache.streampipes.wrapper.runtime.EventProcessor;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class NumberLabeler implements EventProcessor<NumberLabelerParameters> {
+
+ private static Logger LOG;
+ private String sensorListValueProperty;
+ private List<Statement> statements;
+
+ @Override
+ public void onInvocation(NumberLabelerParameters stateBufferParameters,
+ SpOutputCollector spOutputCollector,
+ EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
+ LOG = stateBufferParameters.getGraph().getLogger(NumberLabeler.class);
+
+ this.sensorListValueProperty = stateBufferParameters.getSensorListValueProperty();
+
+ this.statements = StatementUtils.getStatements(stateBufferParameters.getStatementsStrings());
+ }
+
+ @Override
+ public void onEvent(Event inputEvent, SpOutputCollector out) {
+
+ Double value = inputEvent.getFieldBySelector(this.sensorListValueProperty).getAsPrimitive().getAsDouble();
+
+ Event resultEvent = StatementUtils.addLabel(inputEvent, value, this.statements, LOG);
+
+ out.collect(resultEvent);
+ }
+
+ @Override
+ public void onDetach() {
+ }
+
+
+}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerController.java
similarity index 67%
rename from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
rename to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerController.java
index f367e61..5f77127 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerController.java
@@ -16,7 +16,7 @@
*
*/
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number;
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
@@ -33,39 +33,25 @@ import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcess
import java.util.List;
-public class StateLabelerController extends StandaloneEventProcessingDeclarer<StateLabelerParameters> {
+public class NumberLabelerController extends StandaloneEventProcessingDeclarer<NumberLabelerParameters> {
- public static final String STATE_FILTER_ID = "stateFilterId";
- public static final String STATE_FIELD_ID = "stateFieldId";
- public static final String OPERATIONS_ID = "operationsId";
public static final String SENSOR_VALUE_ID = "sensorValueId";
public static final String LABEL_COLLECTION_ID = "labelCollectionId";
public static final String LABEL_STRING_ID = "labelStringId";
public static final String LABEL = "label";
- public static final String MINIMUM = "minimum";
- public static final String MAXIMUM = "maximum";
- public static final String AVERAGE = "average";
-
@Override
public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler")
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number")
.withLocales(Locales.EN)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.requiredStream(StreamRequirementsBuilder.create()
.requiredPropertyWithUnaryMapping(
- EpRequirements.listRequirement(),
+ EpRequirements.numberReq(),
Labels.withId(SENSOR_VALUE_ID),
PropertyScope.NONE)
- .requiredPropertyWithUnaryMapping(
- EpRequirements.domainPropertyReqList(SPSensor.STATE),
- Labels.withId(STATE_FIELD_ID),
- PropertyScope.NONE)
.build())
- .requiredTextParameter(Labels.withId(STATE_FILTER_ID))
- .requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
- Options.from(MINIMUM, MAXIMUM, AVERAGE))
.requiredParameterAsCollection(
Labels.withId(LABEL_COLLECTION_ID),
StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID)))
@@ -83,17 +69,14 @@ public class StateLabelerController extends StandaloneEventProcessingDeclarer<St
}
@Override
- public ConfiguredEventProcessor<StateLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
+ public ConfiguredEventProcessor<NumberLabelerParameters> onInvocation(DataProcessorInvocation graph, ProcessingElementParameterExtractor extractor) {
String sensorListValueProperty = extractor.mappingPropertyValue(SENSOR_VALUE_ID);
- String stateProperty = extractor.mappingPropertyValue(STATE_FIELD_ID);
- String stateFilter = extractor.singleValueParameter(STATE_FILTER_ID, String.class);
- String selectedOperation = extractor.selectedSingleValue(OPERATIONS_ID, String.class);
List<String> statementStrings = extractor.singleValueParameterFromCollection(LABEL_COLLECTION_ID, String.class);
- StateLabelerParameters params = new StateLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
+ NumberLabelerParameters params = new NumberLabelerParameters(graph, sensorListValueProperty, statementStrings);
- return new ConfiguredEventProcessor<>(params, StateLabeler::new);
+ return new ConfiguredEventProcessor<>(params, NumberLabeler::new);
}
}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerParameters.java
similarity index 57%
rename from streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
rename to streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerParameters.java
index 885a722..5063bb8 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/StateLabelerParameters.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/number/NumberLabelerParameters.java
@@ -16,32 +16,23 @@
*
*/
-package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler;
+package org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.wrapper.params.binding.EventProcessorBindingParams;
import java.util.List;
-public class StateLabelerParameters extends EventProcessorBindingParams {
+public class NumberLabelerParameters extends EventProcessorBindingParams {
private String sensorListValueProperty;
- private String stateProperty;
- private String stateFilter;
- private String selectedOperation;
private List<String> statementsStrings;
- public StateLabelerParameters(DataProcessorInvocation graph,
- String sensorListValueProperty,
- String stateProperty,
- String stateFilter,
- String selectedOperation,
- List<String> statementsStrings) {
+ public NumberLabelerParameters(DataProcessorInvocation graph,
+ String sensorListValueProperty,
+ List<String> statementsStrings) {
super(graph);
this.sensorListValueProperty = sensorListValueProperty;
- this.stateProperty = stateProperty;
- this.stateFilter = stateFilter;
- this.selectedOperation = selectedOperation;
this.statementsStrings = statementsStrings;
}
@@ -53,22 +44,6 @@ public class StateLabelerParameters extends EventProcessorBindingParams {
this.sensorListValueProperty = sensorListValueProperty;
}
- public String getStateFilter() {
- return stateFilter;
- }
-
- public void setStateFilter(String stateFilter) {
- this.stateFilter = stateFilter;
- }
-
- public String getSelectedOperation() {
- return selectedOperation;
- }
-
- public void setSelectedOperation(String selectedOperation) {
- this.selectedOperation = selectedOperation;
- }
-
public List<String> getStatementsStrings() {
return statementsStrings;
}
@@ -77,11 +52,4 @@ public class StateLabelerParameters extends EventProcessorBindingParams {
this.statementsStrings = statementsStrings;
}
- public String getStateProperty() {
- return stateProperty;
- }
-
- public void setStateProperty(String stateProperty) {
- this.stateProperty = stateProperty;
- }
}
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/documentation.md
similarity index 98%
copy from streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
copy to streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/documentation.md
index ffe7545..6f85882 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/documentation.md
@@ -16,7 +16,7 @@
~
-->
-## State Labeler
+## State Buffer Labeler
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/icon.png
new file mode 100644
index 0000000..034fb5b
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/strings.en
similarity index 87%
rename from streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en
rename to streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/strings.en
index 5b77578..62ac488 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/strings.en
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer/strings.en
@@ -1,5 +1,5 @@
-org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.title=State Labeler
-org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.description=Adds a label based on a user defined rule to a sensor time-series
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.title=State Buffer Labeler
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.buffer.description=Adds a label based on a user defined rule to a sensor time-series
stateFilterId.title=Select a specific state
stateFilterId.description=Add a filter to define which states to evaluate. Add '*' to select all states.
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/documentation.md
similarity index 66%
rename from streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
rename to streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/documentation.md
index ffe7545..20bd156 100644
--- a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/documentation.md
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/documentation.md
@@ -16,7 +16,7 @@
~
-->
-## State Labeler
+## Number Labeler
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,33 +26,22 @@
## Description
-Apply a rule to a time-series recorded during a state of a machine. (E.g. when minimum value is lower then 10, add label `not ok` else add label `ok`)
-
+Apply a rule to a value of a field. (E.g. when minimum value is lower then 10, add label `not ok` else add label `ok`)
***
## Required input
-Requires a list with sensor values and a field defining the state
-
-### Sensor values
-
-An array representing sensor values recorded during the state.
+Requires a sensor value
-### State field
+### Sensor value
-A field representing the state when the sensor values where recorded.
+A number representing the current sensor value.
***
## Configuration
-### Select a specific state
-When you are interested in the values of a specific state add it here. All other states will be ignored. To get results of all states enter `*`
-
-### Operation
-Operation that will be performed on the sensor values (calculate `maximim`, or `average`, or `minimum`)
-
### Condition
Define a rule which label to add. Example: `<;5;nok` means when the calculated value is smaller then 5 add label ok.
The default label can be defined with `*;nok`.
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/icon.png
new file mode 100644
index 0000000..9fc17e6
Binary files /dev/null and b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/icon.png differ
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/strings.en b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/strings.en
new file mode 100644
index 0000000..31c6151
--- /dev/null
+++ b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number/strings.en
@@ -0,0 +1,14 @@
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.title=Number Labeler
+org.apache.streampipes.processors.transformation.jvm.processor.state.labeler.number.description=Adds a label based on a user defined rule to a sensor time-series
+
+sensorValueId.title=Sensor value
+sensorValueId.description=The value used for the labeling
+
+labelCollectionId.title=Condition
+labelCollectionId.description=Add a condition with the following scheme (Order is important) '<;5;ok', '<;10;ok' or '*;nok' as a default label
+
+label.title=Label
+label.description=User defined label for data
+
+labelStringId.title=Condition
+labelStringId.description=Add a condition with the following scheme (Order is important) '<;5;ok', '<;10;ok' or '*;nok' as a default label
diff --git a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png b/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png
deleted file mode 100644
index b323e75..0000000
Binary files a/streampipes-processors-transformation-jvm/src/main/resources/org.apache.streampipes.processors.transformation.jvm.processor.state.labeler/icon.png and /dev/null differ