You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2020/08/31 21:23:06 UTC
[incubator-streampipes-extensions] branch dev updated: Update state
buffer labeler
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
The following commit(s) were added to refs/heads/dev by this push:
new 6af8551 Update state buffer labeler
6af8551 is described below
commit 6af85513157caca691b8987d89eb941b57fa4201
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Aug 31 23:22:31 2020 +0200
Update state buffer labeler
---
.../state/labeler/buffer/StateBufferLabeler.java | 17 ++++---
.../buffer/StateBufferLabelerController.java | 54 +++++++++++++++++-----
.../buffer/StateBufferLabelerParameters.java | 44 +++++++++++++-----
.../state/labeler/model/StatementUtils.java | 22 ---------
4 files changed, 86 insertions(+), 51 deletions(-)
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
index a5454b5..6a73ef8 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabeler.java
@@ -42,17 +42,20 @@ public class StateBufferLabeler implements EventProcessor<StateBufferLabelerPara
private List<Statement> statements;
@Override
- public void onInvocation(StateBufferLabelerParameters stateBufferParameters,
+ public void onInvocation(StateBufferLabelerParameters stateBufferLabelerParameters,
SpOutputCollector spOutputCollector,
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
- LOG = stateBufferParameters.getGraph().getLogger(StateBufferLabeler.class);
+ LOG = stateBufferLabelerParameters.getGraph().getLogger(StateBufferLabeler.class);
- this.sensorListValueProperty = stateBufferParameters.getSensorListValueProperty();
- this.stateProperty = stateBufferParameters.getStateProperty();
- this.stateFilter = stateBufferParameters.getStateFilter();
- this.selectedOperation = stateBufferParameters.getSelectedOperation();
+ this.sensorListValueProperty = stateBufferLabelerParameters.getSensorListValueProperty();
+ this.stateProperty = stateBufferLabelerParameters.getStateProperty();
+ this.stateFilter = stateBufferLabelerParameters.getStateFilter();
+ this.selectedOperation = stateBufferLabelerParameters.getSelectedOperation();
- this.statements = StatementUtils.getStatements(stateBufferParameters.getStatementsStrings());
+ this.statements = StatementUtils.getStatements(
+ stateBufferLabelerParameters.getNumberValues(),
+ stateBufferLabelerParameters.getLabelStrings(),
+ stateBufferLabelerParameters.getComparators());
}
@Override
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
index 63d89a3..6d490bc 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerController.java
@@ -21,6 +21,10 @@ package org.apache.streampipes.processors.transformation.jvm.processor.state.lab
import org.apache.streampipes.model.graph.DataProcessorDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.schema.PropertyScope;
+import org.apache.streampipes.model.staticproperty.FreeTextStaticProperty;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import org.apache.streampipes.sdk.StaticProperties;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
@@ -32,6 +36,7 @@ import org.apache.streampipes.wrapper.standalone.ConfiguredEventProcessor;
import org.apache.streampipes.wrapper.standalone.declarer.StandaloneEventProcessingDeclarer;
import java.util.List;
+import java.util.stream.Collectors;
public class StateBufferLabelerController extends StandaloneEventProcessingDeclarer<StateBufferLabelerParameters> {
@@ -40,6 +45,8 @@ public class StateBufferLabelerController extends StandaloneEventProcessingDecla
public static final String OPERATIONS_ID = "operationsId";
public static final String SENSOR_VALUE_ID = "sensorValueId";
public static final String LABEL_COLLECTION_ID = "labelCollectionId";
+ public static final String COMPARATOR_ID = "comparatorId";
+ public static final String NUMBER_VALUE_ID = "numberValueId";
public static final String LABEL_STRING_ID = "labelStringId";
public static final String LABEL = "label";
@@ -66,16 +73,15 @@ public class StateBufferLabelerController extends StandaloneEventProcessingDecla
.requiredTextParameter(Labels.withId(STATE_FILTER_ID))
.requiredSingleValueSelection(Labels.withId(OPERATIONS_ID),
Options.from(MINIMUM, MAXIMUM, AVERAGE))
- .requiredParameterAsCollection(
+ .requiredCollection(
Labels.withId(LABEL_COLLECTION_ID),
- StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID)))
-
-// StaticProperties.collection(Labels.withId(PLC_NODES),
-// StaticProperties.stringFreeTextProperty(Labels.withId(PLC_NODE_RUNTIME_NAME)),
-// StaticProperties.stringFreeTextProperty(Labels.withId(PLC_NODE_NAME)),
-// StaticProperties.singleValueSelection(Labels.withId(PLC_NODE_TYPE),
-// Options.from("Bool", "Byte", "Int", "Word", "Real"))))
-
+ StaticProperties.group(Labels.from("group", "Group", ""), false,
+ StaticProperties.singleValueSelection(Labels.withId(COMPARATOR_ID),
+ Options.from("<", "<=", ">", ">=", "==", "*")),
+ StaticProperties.doubleFreeTextProperty(Labels.withId(NUMBER_VALUE_ID)),
+ StaticProperties.stringFreeTextProperty(Labels.withId(LABEL_STRING_ID))
+ )
+ )
.outputStrategy(OutputStrategies.append(
EpProperties.stringEp(Labels.withId(LABEL), LABEL, SPSensor.STATE, PropertyScope.DIMENSION_PROPERTY)
))
@@ -90,9 +96,35 @@ public class StateBufferLabelerController extends StandaloneEventProcessingDecla
String stateFilter = extractor.singleValueParameter(STATE_FILTER_ID, String.class);
String selectedOperation = extractor.selectedSingleValue(OPERATIONS_ID, String.class);
- List<String> statementStrings = extractor.singleValueParameterFromCollection(LABEL_COLLECTION_ID, String.class);
+ List<StaticPropertyGroup> groupItems = extractor.collectionMembersAsGroup(LABEL_COLLECTION_ID);
+ List<Integer> numberValues = groupItems
+ .stream()
+ .map(group -> (
+ extractor
+ .extractGroupMember(NUMBER_VALUE_ID, group)
+ .as(FreeTextStaticProperty.class))
+ .getValue())
+ .map(Integer::parseInt)
+ .collect(Collectors.toList());
+
+ List<String> labelStrings = groupItems
+ .stream()
+ .map(group -> (extractor
+ .extractGroupMember(LABEL_STRING_ID, group)
+ .as(FreeTextStaticProperty.class))
+ .getValue())
+ .collect(Collectors.toList());
- StateBufferLabelerParameters params = new StateBufferLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, statementStrings);
+ List<String> comparators = groupItems
+ .stream()
+ .map(group -> (extractor
+ .extractGroupMember(COMPARATOR_ID, group)
+ .as(OneOfStaticProperty.class))
+ .getOptions()
+ .stream()
+ .filter(Option::isSelected).findFirst().get().getName())
+ .collect(Collectors.toList());
+ StateBufferLabelerParameters params = new StateBufferLabelerParameters(graph, sensorListValueProperty, stateProperty, stateFilter, selectedOperation, numberValues, labelStrings, comparators);
return new ConfiguredEventProcessor<>(params, StateBufferLabeler::new);
}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
index 84428ab..b289b4c 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/buffer/StateBufferLabelerParameters.java
@@ -29,20 +29,26 @@ public class StateBufferLabelerParameters extends EventProcessorBindingParams {
private String stateProperty;
private String stateFilter;
private String selectedOperation;
- private List<String> statementsStrings;
+ private List<Integer> numberValues;
+ private List<String> labelStrings;
+ private List<String> comparators;
public StateBufferLabelerParameters(DataProcessorInvocation graph,
String sensorListValueProperty,
String stateProperty,
String stateFilter,
String selectedOperation,
- List<String> statementsStrings) {
+ List<Integer> numberValues,
+ List<String> labelStrings,
+ List<String> comparators) {
super(graph);
this.sensorListValueProperty = sensorListValueProperty;
this.stateProperty = stateProperty;
this.stateFilter = stateFilter;
this.selectedOperation = selectedOperation;
- this.statementsStrings = statementsStrings;
+ this.numberValues = numberValues;
+ this.labelStrings = labelStrings;
+ this.comparators = comparators;
}
public String getSensorListValueProperty() {
@@ -69,14 +75,6 @@ public class StateBufferLabelerParameters extends EventProcessorBindingParams {
this.selectedOperation = selectedOperation;
}
- public List<String> getStatementsStrings() {
- return statementsStrings;
- }
-
- public void setStatementsStrings(List<String> statementsStrings) {
- this.statementsStrings = statementsStrings;
- }
-
public String getStateProperty() {
return stateProperty;
}
@@ -84,4 +82,28 @@ public class StateBufferLabelerParameters extends EventProcessorBindingParams {
public void setStateProperty(String stateProperty) {
this.stateProperty = stateProperty;
}
+
+ public List<Integer> getNumberValues() {
+ return numberValues;
+ }
+
+ public void setNumberValues(List<Integer> numberValues) {
+ this.numberValues = numberValues;
+ }
+
+ public List<String> getLabelStrings() {
+ return labelStrings;
+ }
+
+ public void setLabelStrings(List<String> labelStrings) {
+ this.labelStrings = labelStrings;
+ }
+
+ public List<String> getComparators() {
+ return comparators;
+ }
+
+ public void setComparators(List<String> comparators) {
+ this.comparators = comparators;
+ }
}
diff --git a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
index 266bcf0..d41749c 100644
--- a/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
+++ b/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/state/labeler/model/StatementUtils.java
@@ -68,28 +68,6 @@ public class StatementUtils {
}
/**
- * Extracts Statements from Strings
- * @param statementStrings
- * @return
- * @throws SpRuntimeException
- */
- @Deprecated
- public static List<Statement> getStatements(List<String> statementStrings) throws SpRuntimeException {
- List<Statement> statements = new ArrayList<>();
-
- for (String s : statementStrings) {
- Statement statement = getStatement(s);
- if (statement == null) {
- throw new SpRuntimeException("Statement: " + s + " is not correctly formatted");
- }
- statements.add(statement);
- }
-
- Collections.reverse(statements);
- return statements;
- }
-
- /**
* This method checks if the user input is correct. When not null is returned
* @param s
* @return