You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampipes.apache.org by ze...@apache.org on 2021/10/25 13:11:53 UTC
[incubator-streampipes-extensions] 02/03: [STREAMPIPES-447]
Processing Element: Detect Value Change
This is an automated email from the ASF dual-hosted git repository.
zehnder pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampipes-extensions.git
commit e1152e748b867ec4bab859609d74d528b3744ca0
Author: Philipp Zehnder <ze...@fzi.de>
AuthorDate: Mon Oct 25 15:03:44 2021 +0200
[STREAMPIPES-447] Processing Element: Detect Value Change
---
.../processors/enricher/jvm/EnricherJvmInit.java | 4 +-
...geController.java => ValueChangeProcessor.java} | 45 ++++++++++++++--------
.../documentation.md | 5 ++-
.../strings.en | 13 ++++++-
.../numericalfilter/NumericalFilterProcessor.java | 2 +-
5 files changed, 47 insertions(+), 22 deletions(-)
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java
index 0fbcbdf..d022600 100644
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java
+++ b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/EnricherJvmInit.java
@@ -30,7 +30,7 @@ import org.apache.streampipes.messaging.kafka.SpKafkaProtocolFactory;
import org.apache.streampipes.messaging.mqtt.SpMqttProtocolFactory;
import org.apache.streampipes.processors.enricher.jvm.processor.jseval.JSEvalController;
import org.apache.streampipes.processors.enricher.jvm.processor.sizemeasure.SizeMeasureController;
-import org.apache.streampipes.processors.enricher.jvm.processor.valueChange.ValueChangeController;
+import org.apache.streampipes.processors.enricher.jvm.processor.valueChange.ValueChangeProcessor;
public class EnricherJvmInit extends StandaloneModelSubmitter {
@@ -46,7 +46,7 @@ public class EnricherJvmInit extends StandaloneModelSubmitter {
8090)
.registerPipelineElements(new SizeMeasureController(),
new JSEvalController(),
- new ValueChangeController())
+ new ValueChangeProcessor())
.registerMessagingFormats(
new JsonDataFormatFactory(),
new CborDataFormatFactory(),
diff --git a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeController.java b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeProcessor.java
similarity index 54%
rename from streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeController.java
rename to streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeProcessor.java
index a378ae6..4200755 100644
--- a/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeController.java
+++ b/streampipes-processors-enricher-jvm/src/main/java/org/apache/streampipes/processors/enricher/jvm/processor/valueChange/ValueChangeProcessor.java
@@ -15,44 +15,59 @@ import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
-public class ValueChangeController extends StreamPipesDataProcessor {
- private static final String CHANGEVALUE_MAPPING = "changevalue-mapping";
- private static final String USER_INPUT_MAPPING = "user-input-mapping";
+public class ValueChangeProcessor extends StreamPipesDataProcessor {
+ private static final String CHANGE_VALUE_MAPPING_ID = "change-value-mapping";
+ private static final String FROM_PROPERTY_VALUE_ID = "from-property-value";
+ private static final String TO_PROPERTY_VALUE_ID = "to-property-value";
+ private static final String IS_CHANGED_ID = "is-changed";
private static final String IS_CHANGED = "isChanged";
-
- private float initValue;
+ private String mappingProperty;
+ private float userDefinedFrom;
+ private float userDefinedTo;
+ private float lastValueOfEvent;
@Override
public DataProcessorDescription declareModel() {
- return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.valueChange","ValueChange","A value change data processor which return a boolean on data change")
+ return ProcessingElementBuilder.create("org.apache.streampipes.processors.enricher.jvm.valueChange")
.category(DataProcessorType.ENRICH)
.withAssets(Assets.DOCUMENTATION, Assets.ICON)
.withLocales(Locales.EN)
- .requiredFloatParameter(Labels.withId(USER_INPUT_MAPPING))
+ .requiredFloatParameter(Labels.withId(FROM_PROPERTY_VALUE_ID))
+ .requiredFloatParameter(Labels.withId(TO_PROPERTY_VALUE_ID))
.requiredStream(StreamRequirementsBuilder
.create()
.requiredPropertyWithUnaryMapping(EpRequirements.numberReq(),
- Labels.withId(CHANGEVALUE_MAPPING),
+ Labels.withId(CHANGE_VALUE_MAPPING_ID),
PropertyScope.NONE)
.build())
- .outputStrategy(OutputStrategies.append(EpProperties.booleanEp(Labels.withId(IS_CHANGED),
- IS_CHANGED,SO.Boolean)))
+ .outputStrategy(OutputStrategies.append(
+ EpProperties.booleanEp(Labels.withId(IS_CHANGED_ID), IS_CHANGED, SO.Boolean)))
.build();
}
@Override
public void onInvocation(ProcessorParams processorParams, SpOutputCollector spOutputCollector, EventProcessorRuntimeContext eventProcessorRuntimeContext) throws SpRuntimeException {
- this.initValue = processorParams.extractor().singleValueParameter(USER_INPUT_MAPPING,Float.class);
+ this.lastValueOfEvent = Float.MAX_VALUE;
+ this.userDefinedFrom = processorParams.extractor().singleValueParameter(FROM_PROPERTY_VALUE_ID, Float.class);
+ this.userDefinedTo = processorParams.extractor().singleValueParameter(TO_PROPERTY_VALUE_ID, Float.class);
+ this.mappingProperty = processorParams.extractor().mappingPropertyValue(CHANGE_VALUE_MAPPING_ID);
}
@Override
public void onEvent(Event event, SpOutputCollector spOutputCollector) throws SpRuntimeException {
- float currValue = event.getFieldBySelector(CHANGEVALUE_MAPPING).getAsPrimitive().getAsFloat();
- if(currValue == this.initValue)
+ float thisValue = event.getFieldBySelector(mappingProperty).getAsPrimitive().getAsFloat();
+ if (this.lastValueOfEvent != Float.MAX_VALUE) {
+ if (this.lastValueOfEvent == this.userDefinedFrom && thisValue == this.userDefinedTo) {
+ event.addField(IS_CHANGED,true);
+ } else {
+ event.addField(IS_CHANGED,false);
+ }
+ } else {
event.addField(IS_CHANGED,false);
- else
- event.addField(IS_CHANGED,true);
+ }
+
+ this.lastValueOfEvent = thisValue;
spOutputCollector.collect(event);
}
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md
index be695c9..6a74d9d 100644
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md
+++ b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/documentation.md
@@ -13,10 +13,11 @@
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
+
~
-->
-## Size Measure
+## Value Change
<p align="center">
<img src="icon.png" width="150px;" class="pe-image-documentation"/>
@@ -26,7 +27,7 @@
## Description
-Measures the size of an incoming event and appends this number to the event by serializing it.
+The processing element should be able to detect when a numeric property change from one configured value to another.
***
diff --git a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en
index e63e1a1..873d715 100644
--- a/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en
+++ b/streampipes-processors-enricher-jvm/src/main/resources/org.apache.streampipes.processors.enricher.jvm.valueChange/strings.en
@@ -1,5 +1,14 @@
org.apache.streampipes.processors.enricher.jvm.valueChange.title=Value Change
org.apache.streampipes.processors.enricher.jvm.valueChange.description=The processing element should be able to detect when a numeric property change from one configured value to another
-valueChange.title=Value Change
-valueChange.description=The unit in which the size of the event should be added
\ No newline at end of file
+change-value-mapping.title=Property to monitor
+change-value-mapping.description=The property where the values are monitored
+
+from-property-value.title=From Value
+from-property-value.description=Result is true, when the property value switches from this value to the To Value
+
+to-property-value.title=To Value
+to-property-value.description=Result is true, wWhen the property value switches from the from value to this
+
+is-changed.title=isChanged
+is-changed.description=
\ No newline at end of file
diff --git a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java
index d55b1eb..990c667 100644
--- a/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java
+++ b/streampipes-processors-filters-jvm/src/main/java/org/apache/streampipes/processors/filters/jvm/processor/numericalfilter/NumericalFilterProcessor.java
@@ -60,7 +60,7 @@ public class NumericalFilterProcessor extends StreamPipesDataProcessor {
.outputStrategy(OutputStrategies.keep())
.requiredSingleValueSelection(Labels.withId(OPERATION), Options.from("<", "<=", ">",
">=", "==", "!="))
- .requiredFloatParameter(Labels.withId(VALUE), NUMBER_MAPPING)
+ .requiredFloatParameter(Labels.withId(VALUE))
.build();
}